Merge pull request #71162 from ClickHouse/pr-right-joins

Fix right JOINS with parallel replicas
This commit is contained in:
Igor Nikonov 2024-11-15 13:32:40 +00:00 committed by GitHub
commit c5b12d3e16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 683 additions and 252 deletions

View File

@ -1,7 +1,6 @@
#pragma once
#include <memory>
#include <vector>
#include <Core/Names.h>
#include <Core/Block.h>

View File

@ -274,7 +274,7 @@ FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr &
return res;
}
FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options)
{
if (select_query_options.only_analyze)
return {};

View File

@ -659,6 +659,7 @@ std::unique_ptr<ExpressionStep> createComputeAliasColumnsStep(
}
JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,
const QueryTreeNodePtr & parent_join_tree,
const SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
PlannerContextPtr & planner_context,
@ -696,8 +697,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
table_expression_query_info.table_expression = table_expression;
if (const auto & filter_actions = table_expression_data.getFilterActions())
table_expression_query_info.filter_actions_dag = std::make_shared<const ActionsDAG>(filter_actions->clone());
table_expression_query_info.current_table_chosen_for_reading_with_parallel_replicas
= table_node == planner_context->getGlobalPlannerContext()->parallel_replicas_table;
size_t max_streams = settings[Setting::max_threads];
size_t max_threads_execute_query = settings[Setting::max_threads];
@ -912,21 +911,35 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// It is just a safety check needed until we have a proper sending plan to replicas.
/// If we have a non-trivial storage like View it might create its own Planner inside read(), run findTableForParallelReplicas()
/// and find some other table that might be used for reading with parallel replicas. It will lead to errors.
const bool other_table_already_chosen_for_reading_with_parallel_replicas
= planner_context->getGlobalPlannerContext()->parallel_replicas_table
&& !table_expression_query_info.current_table_chosen_for_reading_with_parallel_replicas;
if (other_table_already_chosen_for_reading_with_parallel_replicas)
planner_context->getMutableQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
storage->read(
query_plan,
columns_names,
storage_snapshot,
table_expression_query_info,
query_context,
from_stage,
max_block_size,
max_streams);
const bool no_tables_or_another_table_chosen_for_reading_with_parallel_replicas_mode
= query_context->canUseParallelReplicasOnFollower()
&& table_node != planner_context->getGlobalPlannerContext()->parallel_replicas_table;
if (no_tables_or_another_table_chosen_for_reading_with_parallel_replicas_mode)
{
auto mutable_context = Context::createCopy(query_context);
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
storage->read(
query_plan,
columns_names,
storage_snapshot,
table_expression_query_info,
std::move(mutable_context),
from_stage,
max_block_size,
max_streams);
}
else
{
storage->read(
query_plan,
columns_names,
storage_snapshot,
table_expression_query_info,
query_context,
from_stage,
max_block_size,
max_streams);
}
auto parallel_replicas_enabled_for_storage = [](const StoragePtr & table, const Settings & query_settings)
{
@ -942,6 +955,19 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// query_plan can be empty if there is nothing to read
if (query_plan.isInitialized() && parallel_replicas_enabled_for_storage(storage, settings))
{
const bool allow_parallel_replicas_for_table_expression = [](const QueryTreeNodePtr & join_tree_node)
{
const JoinNode * join_node = join_tree_node->as<JoinNode>();
if (!join_node)
return true;
const auto join_kind = join_node->getKind();
if (join_kind == JoinKind::Left || join_kind == JoinKind::Right || join_kind == JoinKind::Inner)
return true;
return false;
}(parent_join_tree);
if (query_context->canUseParallelReplicasCustomKey() && query_context->getClientInfo().distributed_depth == 0)
{
if (auto cluster = query_context->getClusterForParallelReplicas();
@ -964,7 +990,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
query_plan = std::move(query_plan_parallel_replicas);
}
}
else if (ClusterProxy::canUseParallelReplicasOnInitiator(query_context))
else if (ClusterProxy::canUseParallelReplicasOnInitiator(query_context) && allow_parallel_replicas_for_table_expression)
{
// (1) find read step
QueryPlan::Node * node = query_plan.getRootNode();
@ -1794,7 +1820,8 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context)
{
auto table_expressions_stack = buildTableExpressionsStack(query_node->as<QueryNode &>().getJoinTree());
const QueryTreeNodePtr & join_tree_node = query_node->as<QueryNode &>().getJoinTree();
auto table_expressions_stack = buildTableExpressionsStack(join_tree_node);
size_t table_expressions_stack_size = table_expressions_stack.size();
bool is_single_table_expression = table_expressions_stack_size == 1;
@ -1829,7 +1856,9 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
* Examples: Distributed, LiveView, Merge storages.
*/
auto left_table_expression = table_expressions_stack.front();
auto left_table_expression_query_plan = buildQueryPlanForTableExpression(left_table_expression,
auto left_table_expression_query_plan = buildQueryPlanForTableExpression(
left_table_expression,
join_tree_node,
select_query_info,
select_query_options,
planner_context,
@ -1902,7 +1931,9 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
* table expression in subquery.
*/
bool is_remote = planner_context->getTableExpressionDataOrThrow(table_expression).isRemote();
query_plans_stack.push_back(buildQueryPlanForTableExpression(table_expression,
query_plans_stack.push_back(buildQueryPlanForTableExpression(
table_expression,
join_tree_node,
select_query_info,
select_query_options,
planner_context,

View File

@ -23,6 +23,8 @@
#include <Storages/StorageMaterializedView.h>
#include <Storages/buildQueryTreeForShard.h>
#include <ranges>
namespace DB
{
namespace Setting
@ -38,12 +40,12 @@ namespace ErrorCodes
/// Returns a list of (sub)queries (candidates) which may support parallel replicas.
/// The rule is :
/// subquery has only LEFT or ALL INNER JOIN (or none), and left part is MergeTree table or subquery candidate as well.
/// subquery has only LEFT / RIGHT / ALL INNER JOIN (or none), and left / right part is MergeTree table or subquery candidate as well.
///
/// Additional checks are required, so we return many candidates. The innermost subquery is on top.
std::stack<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTreeNode * query_tree_node)
std::vector<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTreeNode * query_tree_node)
{
std::stack<const QueryNode *> res;
std::vector<const QueryNode *> res;
while (query_tree_node)
{
@ -75,7 +77,7 @@ std::stack<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTre
{
const auto & query_node_to_process = query_tree_node->as<QueryNode &>();
query_tree_node = query_node_to_process.getJoinTree().get();
res.push(&query_node_to_process);
res.push_back(&query_node_to_process);
break;
}
case QueryTreeNodeType::UNION:
@ -98,17 +100,16 @@ std::stack<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTre
case QueryTreeNodeType::JOIN:
{
const auto & join_node = query_tree_node->as<JoinNode &>();
auto join_kind = join_node.getKind();
auto join_strictness = join_node.getStrictness();
const auto join_kind = join_node.getKind();
const auto join_strictness = join_node.getStrictness();
bool can_parallelize_join =
join_kind == JoinKind::Left
|| (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All);
if (!can_parallelize_join)
if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All))
query_tree_node = join_node.getLeftTableExpression().get();
else if (join_kind == JoinKind::Right)
query_tree_node = join_node.getRightTableExpression().get();
else
return {};
query_tree_node = join_node.getLeftTableExpression().get();
break;
}
default:
@ -163,14 +164,27 @@ QueryTreeNodePtr replaceTablesWithDummyTables(QueryTreeNodePtr query, const Cont
return query->cloneAndReplace(visitor.replacement_map);
}
#ifdef DUMP_PARALLEL_REPLICAS_QUERY_CANDIDATES
static void dumpStack(const std::vector<const QueryNode *> & stack)
{
std::ranges::reverse_view rv{stack};
for (const auto * node : rv)
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "{}\n{}", CityHash_v1_0_2::Hash128to64(node->getTreeHash()), node->dumpTree());
}
#endif
/// Find the best candidate for parallel replicas execution by verifying query plan.
/// If query plan has only Expression, Filter of Join steps, we can execute it fully remotely and check the next query.
/// If query plan has only Expression, Filter or Join steps, we can execute it fully remotely and check the next query.
/// Otherwise we can execute current query up to WithMergableStage only.
const QueryNode * findQueryForParallelReplicas(
std::stack<const QueryNode *> stack,
std::vector<const QueryNode *> stack,
const std::unordered_map<const QueryNode *, const QueryPlan::Node *> & mapping,
const Settings & settings)
{
#ifdef DUMP_PARALLEL_REPLICAS_QUERY_CANDIDATES
dumpStack(stack);
#endif
struct Frame
{
const QueryPlan::Node * node = nullptr;
@ -189,8 +203,8 @@ const QueryNode * findQueryForParallelReplicas(
while (!stack.empty())
{
const QueryNode * const subquery_node = stack.top();
stack.pop();
const QueryNode * const subquery_node = stack.back();
stack.pop_back();
auto it = mapping.find(subquery_node);
/// This should not happen ideally.
@ -236,7 +250,7 @@ const QueryNode * findQueryForParallelReplicas(
else
{
const auto * join = typeid_cast<JoinStep *>(step);
/// We've checked that JOIN is INNER/LEFT in query tree.
/// We've checked that JOIN is INNER/LEFT/RIGHT on query tree level before.
/// Don't distribute UNION node.
if (!join)
return res;
@ -263,7 +277,7 @@ const QueryNode * findQueryForParallelReplicas(
return res;
}
const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options)
{
if (select_query_options.only_analyze)
return nullptr;
@ -287,7 +301,7 @@ const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tr
return nullptr;
/// We don't have any subquery and storage can process parallel replicas by itself.
if (stack.top() == query_tree_node.get())
if (stack.back() == query_tree_node.get())
return nullptr;
/// This is needed to avoid infinite recursion.
@ -310,31 +324,33 @@ const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tr
const auto & mapping = planner.getQueryNodeToPlanStepMapping();
const auto * res = findQueryForParallelReplicas(new_stack, mapping, context->getSettingsRef());
/// Now, return a query from initial stack.
if (res)
{
// find query in initial stack
while (!new_stack.empty())
{
if (res == new_stack.top())
return stack.top();
if (res == new_stack.back())
{
res = stack.back();
break;
}
stack.pop();
new_stack.pop();
stack.pop_back();
new_stack.pop_back();
}
}
return res;
}
static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * query_tree_node)
{
std::stack<const IQueryTreeNode *> right_join_nodes;
while (query_tree_node || !right_join_nodes.empty())
std::stack<const IQueryTreeNode *> join_nodes;
while (query_tree_node || !join_nodes.empty())
{
if (!query_tree_node)
{
query_tree_node = right_join_nodes.top();
right_join_nodes.pop();
query_tree_node = join_nodes.top();
join_nodes.pop();
}
auto join_tree_node_type = query_tree_node->getNodeType();
@ -383,8 +399,23 @@ static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * que
case QueryTreeNodeType::JOIN:
{
const auto & join_node = query_tree_node->as<JoinNode &>();
query_tree_node = join_node.getLeftTableExpression().get();
right_join_nodes.push(join_node.getRightTableExpression().get());
const auto join_kind = join_node.getKind();
const auto join_strictness = join_node.getStrictness();
if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner and join_strictness == JoinStrictness::All))
{
query_tree_node = join_node.getLeftTableExpression().get();
join_nodes.push(join_node.getRightTableExpression().get());
}
else if (join_kind == JoinKind::Right)
{
query_tree_node = join_node.getRightTableExpression().get();
join_nodes.push(join_node.getLeftTableExpression().get());
}
else
{
return nullptr;
}
break;
}
default:
@ -400,7 +431,7 @@ static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * que
return nullptr;
}
const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options)
{
if (select_query_options.only_analyze)
return nullptr;

View File

@ -15,10 +15,10 @@ struct SelectQueryOptions;
/// Find a query which can be executed with parallel replicas up to WithMergableStage.
/// Returned query will always contain some (>1) subqueries, possibly with joins.
const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options);
const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options);
/// Find a table from which we should read on follower replica. It's the left-most table within all JOINs and UNIONs.
const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options);
const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, const SelectQueryOptions & select_query_options);
struct JoinTreeQueryPlan;

View File

@ -3,12 +3,15 @@
#include <Common/checkStackSize.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/Context.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/StorageID.h>
#include <Interpreters/TableJoin.h>
#include <Parsers/ASTFunction.h>
#include <Processors/QueryPlan/ConvertingActions.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
@ -62,7 +65,14 @@ std::pair<std::unique_ptr<QueryPlan>, bool> createLocalPlanForParallelReplicas(
break;
if (!node->children.empty())
node = node->children.at(0);
{
// in case of RIGHT JOIN, - reading from right table is parallelized among replicas
const JoinStep * join = typeid_cast<JoinStep*>(node->step.get());
if (join && join->getJoin()->getTableJoin().kind() == JoinKind::Right)
node = node->children.at(1);
else
node = node->children.at(0);
}
else
node = nullptr;
}

View File

@ -162,8 +162,6 @@ struct SelectQueryInfo
/// It's guaranteed to be present in JOIN TREE of `query_tree`
QueryTreeNodePtr table_expression;
bool current_table_chosen_for_reading_with_parallel_replicas = false;
/// Table expression modifiers for storage
std::optional<TableExpressionModifiers> table_expression_modifiers;

View File

@ -276,9 +276,7 @@ void StorageMergeTree::read(
}
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower()
&& local_context->getSettingsRef()[Setting::parallel_replicas_for_non_replicated_merge_tree]
&& (!local_context->getSettingsRef()[Setting::allow_experimental_analyzer]
|| query_info.current_table_chosen_for_reading_with_parallel_replicas);
&& local_context->getSettingsRef()[Setting::parallel_replicas_for_non_replicated_merge_tree];
if (auto plan = reader.read(
column_names,

View File

@ -5640,10 +5640,7 @@ void StorageReplicatedMergeTree::readLocalImpl(
const size_t max_block_size,
const size_t num_streams)
{
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower()
&& (!local_context->getSettingsRef()[Setting::allow_experimental_analyzer]
|| query_info.current_table_chosen_for_reading_with_parallel_replicas);
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower();
auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,

View File

@ -314,6 +314,35 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
return temporary_table_expression_node;
}
QueryTreeNodePtr getSubqueryFromTableExpression(
const QueryTreeNodePtr & join_table_expression,
const std::unordered_map<QueryTreeNodePtr, CollectColumnSourceToColumnsVisitor::Columns> & column_source_to_columns,
const ContextPtr & context)
{
auto join_table_expression_node_type = join_table_expression->getNodeType();
QueryTreeNodePtr subquery_node;
if (join_table_expression_node_type == QueryTreeNodeType::QUERY || join_table_expression_node_type == QueryTreeNodeType::UNION)
{
subquery_node = join_table_expression;
}
else if (
join_table_expression_node_type == QueryTreeNodeType::TABLE || join_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION)
{
const auto & columns = column_source_to_columns.at(join_table_expression).columns;
subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns, join_table_expression, context);
}
else
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected JOIN table expression to be table, table function, query or union node. Actual {}",
join_table_expression->formatASTForErrorMessage());
}
return subquery_node;
}
}
QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify)
@ -335,37 +364,31 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex
{
if (auto * join_node = global_in_or_join_node.query_node->as<JoinNode>())
{
auto join_right_table_expression = join_node->getRightTableExpression();
auto join_right_table_expression_node_type = join_right_table_expression->getNodeType();
QueryTreeNodePtr subquery_node;
if (join_right_table_expression_node_type == QueryTreeNodeType::QUERY ||
join_right_table_expression_node_type == QueryTreeNodeType::UNION)
QueryTreeNodePtr join_table_expression;
const auto join_kind = join_node->getKind();
if (join_kind == JoinKind::Left || join_kind == JoinKind::Inner)
{
subquery_node = join_right_table_expression;
join_table_expression = join_node->getRightTableExpression();
}
else if (join_right_table_expression_node_type == QueryTreeNodeType::TABLE ||
join_right_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION)
else if (join_kind == JoinKind::Right)
{
const auto & columns = column_source_to_columns.at(join_right_table_expression).columns;
subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns,
join_right_table_expression,
planner_context->getQueryContext());
join_table_expression = join_node->getLeftTableExpression();
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected JOIN right table expression to be table, table function, query or union node. Actual {}",
join_right_table_expression->formatASTForErrorMessage());
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Unexpected join kind: {}", join_kind);
}
auto subquery_node
= getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext());
auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
temporary_table_expression_node->setAlias(join_right_table_expression->getAlias());
temporary_table_expression_node->setAlias(join_table_expression->getAlias());
replacement_map.emplace(join_right_table_expression.get(), std::move(temporary_table_expression_node));
replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node));
continue;
}
if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())

View File

@ -1,5 +1,5 @@
-- Tags: zookeeper
DROP TABLE IF EXISTS join_inner_table__fuzz_146_replicated;
DROP TABLE IF EXISTS join_inner_table__fuzz_146_replicated SYNC;
CREATE TABLE join_inner_table__fuzz_146_replicated
(
`id` UUID,
@ -52,4 +52,4 @@ WHERE
GROUP BY is_initial_query, query
ORDER BY is_initial_query DESC, c, query;
DROP TABLE join_inner_table__fuzz_146_replicated;
DROP TABLE join_inner_table__fuzz_146_replicated SYNC;

View File

@ -2,7 +2,7 @@
set parallel_replicas_prefer_local_join = 0;
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -18,7 +18,7 @@ select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z;
Union
Expression
Join
@ -40,8 +40,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -63,8 +62,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Union
@ -90,8 +88,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by key;
54 54 50 50 12 12 0
64 64 0 0 0 0 1
explain description=0
@ -100,8 +97,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by key;
Expression
Sorting
Expression
@ -129,8 +125,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -152,8 +147,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Expression
@ -181,8 +175,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -204,8 +197,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Expression
@ -237,8 +229,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5;
0 0 0 0 0 0
6 6 6 6 0 0
8 8 8 8 0 0
@ -260,30 +251,21 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Join
Expression
Join
Union
select * from sub5;
Union
Expression
Join
Expression
Join
Expression
ReadFromMemoryStorage
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Union
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Union
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
ReadFromMemoryStorage
Expression
ReadFromRemoteParallelReplicas
--
-- RIGHT JOIN in sub5: sub5 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -291,7 +273,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
0 0 1 1 0 0
0 0 3 3 0 0
@ -313,31 +295,26 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Expression
Join
Union
Union
Expression
Sorting
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Expression
Join
Union
Join
Expression
Expression
ReadFromMergeTree
ReadFromMemoryStorage
Expression
ReadFromRemoteParallelReplicas
Union
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Join
Expression
Expression
ReadFromMergeTree
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
--
-- Subqueries for IN allowed
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
@ -345,8 +322,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -368,8 +344,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Union
@ -402,7 +377,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -425,7 +400,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;-- { echoOn }
SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0;-- { echoOn }
Expression
Sorting
Expression
@ -455,7 +430,7 @@ Expression
ReadFromRemoteParallelReplicas
set parallel_replicas_prefer_local_join = 1;
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -471,7 +446,7 @@ select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z;
Union
Expression
Join
@ -495,8 +470,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -518,8 +492,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Union
@ -547,8 +520,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by key;
54 54 50 50 12 12 0
64 64 0 0 0 0 1
explain description=0
@ -557,8 +529,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by key;
Expression
Sorting
Expression
@ -588,8 +559,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -611,8 +581,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Expression
@ -641,8 +610,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -664,8 +632,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Expression
@ -697,8 +664,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5;
0 0 0 0 0 0
6 6 6 6 0 0
8 8 8 8 0 0
@ -720,30 +686,23 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
Expression
Join
Expression
Join
Union
select * from sub5;
Union
Expression
Join
Expression
Join
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Union
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Union
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- RIGHT JOIN in sub5: sub5 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -751,7 +710,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
0 0 1 1 0 0
0 0 3 3 0 0
@ -773,31 +732,27 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Expression
Join
Union
Union
Expression
Sorting
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Expression
Join
Union
Join
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Union
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
Join
Expression
Expression
ReadFromMergeTree
Expression
Expression
ReadFromMergeTree
Expression
ReadFromRemoteParallelReplicas
--
-- Subqueries for IN allowed
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
@ -805,8 +760,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -828,8 +782,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
Expression
Sorting
Union
@ -864,7 +817,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
@ -887,7 +840,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0;
Expression
Sorting
Expression

View File

@ -1,16 +1,17 @@
drop table if exists tab1;
drop table if exists tab2;
drop table if exists tab3;
drop table if exists tab1 sync;
drop table if exists tab2 sync;
drop table if exists tab3 sync;
create table tab1 (x UInt32, y UInt32, shard UInt32) engine = MergeTree order by shard;
create table tab2 (y UInt32, z UInt32) engine = MergeTree order by tuple();
create table tab3 (z UInt32, a UInt32) engine = MergeTree order by tuple();
create table tab1 (x UInt32, y UInt32, shard UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/{database}/test_02967/tab1', 'r1') order by shard;
create table tab2 (y UInt32, z UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/{database}/test_02967/tab2', 'r1') order by tuple();
create table tab3 (z UInt32, a UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/{database}/test_02967/tab3', 'r1') order by tuple();
insert into tab1 select number, number, number from numbers(16);
insert into tab2 select number * 2, number * 2 from numbers(8);
insert into tab3 select number * 4, number * 4 from numbers(4);
set parallel_replicas_local_plan=1;
set enable_analyzer = 1;
set enable_parallel_replicas = 2, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_local_plan = 1;
{% for use_global_in in [0, 1] -%}
@ -19,8 +20,9 @@ set parallel_replicas_local_plan=1;
set parallel_replicas_prefer_local_join = {{use_global_in}};
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x;
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z;
--
-- The same query with cte;
with sub1 as (select x, y from tab1 where x != 2),
@ -28,8 +30,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -37,8 +38,8 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
--
-- GROUP BY should work up to WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -46,8 +47,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by key;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -55,8 +55,8 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by key;
--
-- ORDER BY in sub3 : sub1 is fully pushed, sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -64,8 +64,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -73,8 +72,8 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
--
-- ORDER BY in sub1 : sub1 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2 order by y),
@ -82,8 +81,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
explain description=0
with sub1 as (select x, y from tab1 where x != 2 order by y),
@ -91,8 +89,8 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
--
-- RIGHT JOIN in sub3: sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -100,8 +98,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -109,8 +106,8 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5;
--
-- RIGHT JOIN in sub5: sub5 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
@ -118,7 +115,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
@ -126,7 +123,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
--
-- Subqueries for IN allowed
@ -135,8 +132,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
@ -144,8 +140,7 @@ sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1;
select * from sub5 order by x;
--
-- Subqueries for IN are not allowed
@ -155,7 +150,7 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0;
explain description=0
with sub1 as (select x, y from tab1 where x in (select number from numbers(16) where number != 2)),
@ -164,6 +159,6 @@ sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS enable_parallel_replicas = 1, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', enable_analyzer=1, parallel_replicas_allow_in_with_subquery=0;
SETTINGS enable_parallel_replicas = 1, parallel_replicas_allow_in_with_subquery = 0;
{%- endfor %}

View File

@ -2,6 +2,7 @@
SET enable_analyzer=1;
SET distributed_foreground_insert=1;
DROP TABLE IF EXISTS first_table_lr SYNC;
CREATE TABLE first_table_lr
(
id String,
@ -11,6 +12,7 @@ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test_03080/alter', '
ORDER BY id;
DROP TABLE IF EXISTS first_table;
CREATE TABLE first_table
(
id String,
@ -19,6 +21,7 @@ CREATE TABLE first_table
ENGINE = Distributed('test_shard_localhost', currentDatabase(), 'first_table_lr');
DROP TABLE IF EXISTS second_table_lr;
CREATE TABLE second_table_lr
(
id String,
@ -26,6 +29,7 @@ CREATE TABLE second_table_lr
) ENGINE = MergeTree()
ORDER BY id;
DROP TABLE IF EXISTS second_table;
CREATE TABLE second_table
(
id String,
@ -36,6 +40,7 @@ ENGINE = Distributed('test_shard_localhost', currentDatabase(), 'second_table_lr
INSERT INTO first_table VALUES ('1', '2'), ('3', '4');
INSERT INTO second_table VALUES ('1', '2'), ('3', '4');
DROP TABLE IF EXISTS two_tables;
CREATE TABLE two_tables
(
id String,

View File

@ -6,12 +6,15 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
$CLICKHOUSE_CLIENT -q "
DROP TABLE IF EXISTS ids;
CREATE TABLE ids (id UUID, whatever String) Engine=MergeTree ORDER BY tuple();
INSERT INTO ids VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', 'whatever');
DROP TABLE IF EXISTS data;
CREATE TABLE data (id UUID, event_time DateTime, status String) Engine=MergeTree ORDER BY tuple();
INSERT INTO data VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', '2000-01-01', 'CREATED');
DROP TABLE IF EXISTS data2;
CREATE TABLE data2 (id UUID, event_time DateTime, status String) Engine=MergeTree ORDER BY tuple();
INSERT INTO data2 VALUES ('a1451105-722e-4fe7-bfaa-65ad2ae249c2', '2000-01-02', 'CREATED');
"

View File

@ -0,0 +1,273 @@
inner
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
9 l9 \N 9 r9 nr9
inner subs
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
9 l9 \N 9 r9 nr9
inner expr
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
9 l9 \N 9 r9 nr9
left
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 9 r9 nr9
left subs
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 9 r9 nr9
left expr
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 9 r9 nr9
right
0 \N 6 r7 nr7
0 \N 7 r8 nr8
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
9 l9 \N 9 r9 nr9
right subs
0 \N 6 r7 nr7
0 \N 7 r8 nr8
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
9 l9 \N 9 r9 nr9
full
0 \N 6 r7 nr7
0 \N 7 r8 nr8
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 9 r9 nr9
full subs
0 \N 6 r7 nr7
0 \N 7 r8 nr8
1 l1 1 1 r1 \N
1 l1 1 1 r2 \N
2 l2 2 2 r3 \N
2 l3 3 2 r3 \N
3 l4 4 3 r4 \N
3 l4 4 3 r5 \N
4 l5 \N 4 r6 nr6
4 l6 \N 4 r6 nr6
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 9 r9 nr9
self inner
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l2 2 2 l3 3
2 l3 3 2 l2 2
2 l3 3 2 l3 3
3 l4 4 3 l4 4
4 l5 \N 4 l5 \N
4 l5 \N 4 l6 \N
4 l6 \N 4 l5 \N
4 l6 \N 4 l6 \N
5 l7 \N 5 l7 \N
8 l8 \N 8 l8 \N
9 l9 \N 9 l9 \N
self inner nullable
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l3 3 2 l3 3
3 l4 4 3 l4 4
self inner nullable vs not nullable
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l3 3 2 l2 2
3 l4 4 2 l3 3
4 l5 \N 3 l4 4
4 l6 \N 3 l4 4
self inner nullable vs not nullable 2
4 r6 nr6 4 r6 nr6
6 r7 nr7 6 r7 nr7
7 r8 nr8 7 r8 nr8
9 r9 nr9 9 r9 nr9
self left
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l2 2 2 l3 3
2 l3 3 2 l2 2
2 l3 3 2 l3 3
3 l4 4 3 l4 4
4 l5 \N 4 l5 \N
4 l5 \N 4 l6 \N
4 l6 \N 4 l5 \N
4 l6 \N 4 l6 \N
5 l7 \N 5 l7 \N
8 l8 \N 8 l8 \N
9 l9 \N 9 l9 \N
self left nullable
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l3 3 2 l3 3
3 l4 4 3 l4 4
4 l5 \N 0 \N
4 l6 \N 0 \N
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 0 \N
self left nullable vs not nullable
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l3 3 2 l2 2
3 l4 4 2 l3 3
4 l5 \N 3 l4 4
4 l6 \N 3 l4 4
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 0 \N
self left nullable vs not nullable 2
1 r1 \N 0 \N
1 r2 \N 0 \N
2 r3 \N 0 \N
3 r4 \N 0 \N
3 r5 \N 0 \N
4 r6 nr6 4 r6 nr6
6 r7 nr7 6 r7 nr7
7 r8 nr8 7 r8 nr8
9 r9 nr9 9 r9 nr9
self right
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l2 2 2 l3 3
2 l3 3 2 l2 2
2 l3 3 2 l3 3
3 l4 4 3 l4 4
4 l5 \N 4 l5 \N
4 l5 \N 4 l6 \N
4 l6 \N 4 l5 \N
4 l6 \N 4 l6 \N
5 l7 \N 5 l7 \N
8 l8 \N 8 l8 \N
9 l9 \N 9 l9 \N
self right nullable
0 \N 4 l5 \N
0 \N 4 l6 \N
0 \N 5 l7 \N
0 \N 8 l8 \N
0 \N 9 l9 \N
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l3 3 2 l3 3
3 l4 4 3 l4 4
self right nullable vs not nullable
0 \N 4 l5 \N
0 \N 4 l6 \N
0 \N 5 l7 \N
0 \N 8 l8 \N
0 \N 9 l9 \N
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l3 3 2 l2 2
3 l4 4 2 l3 3
4 l5 \N 3 l4 4
4 l6 \N 3 l4 4
self full
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l2 2 2 l3 3
2 l3 3 2 l2 2
2 l3 3 2 l3 3
3 l4 4 3 l4 4
4 l5 \N 4 l5 \N
4 l5 \N 4 l6 \N
4 l6 \N 4 l5 \N
4 l6 \N 4 l6 \N
5 l7 \N 5 l7 \N
8 l8 \N 8 l8 \N
9 l9 \N 9 l9 \N
self full nullable
0 \N 4 l5 \N
0 \N 4 l6 \N
0 \N 5 l7 \N
0 \N 8 l8 \N
0 \N 9 l9 \N
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l3 3 2 l3 3
3 l4 4 3 l4 4
4 l5 \N 0 \N
4 l6 \N 0 \N
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 0 \N
self full nullable vs not nullable
0 \N 4 l5 \N
0 \N 4 l6 \N
0 \N 5 l7 \N
0 \N 8 l8 \N
0 \N 9 l9 \N
1 l1 1 1 l1 1
2 l2 2 2 l2 2
2 l3 3 2 l2 2
3 l4 4 2 l3 3
4 l5 \N 3 l4 4
4 l6 \N 3 l4 4
5 l7 \N 0 \N
8 l8 \N 0 \N
9 l9 \N 0 \N

View File

@ -0,0 +1,73 @@
drop table if exists X sync;
drop table if exists Y sync;
set min_bytes_to_use_direct_io = 0; -- min_bytes_to_use_direct_io > 0 is broken and leads to unexpected results, https://github.com/ClickHouse/ClickHouse/issues/65690
create table X (id Int32, x_a String, x_b Nullable(Int32)) engine ReplicatedMergeTree('/clickhouse/{database}/X', '1') order by tuple();
create table Y (id Int32, y_a String, y_b Nullable(String)) engine ReplicatedMergeTree('/clickhouse/{database}/Y', '1') order by tuple();
insert into X (id, x_a, x_b) values (1, 'l1', 1), (2, 'l2', 2), (2, 'l3', 3), (3, 'l4', 4);
insert into X (id, x_a) values (4, 'l5'), (4, 'l6'), (5, 'l7'), (8, 'l8'), (9, 'l9');
insert into Y (id, y_a) values (1, 'r1'), (1, 'r2'), (2, 'r3'), (3, 'r4'), (3, 'r5');
insert into Y (id, y_a, y_b) values (4, 'r6', 'nr6'), (6, 'r7', 'nr7'), (7, 'r8', 'nr8'), (9, 'r9', 'nr9');
set enable_analyzer = 1, enable_parallel_replicas = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
select 'inner';
select X.*, Y.* from X inner join Y on X.id = Y.id order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b;
select 'inner subs';
select s.*, j.* from (select * from X) as s inner join (select * from Y) as j on s.id = j.id order by s.id, s.x_a, s.x_b, j.id, j.y_a, j.y_b;
select 'inner expr';
select X.*, Y.* from X inner join Y on (X.id + 1) = (Y.id + 1) order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b;
select 'left';
select X.*, Y.* from X left join Y on X.id = Y.id order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b;
select 'left subs';
select s.*, j.* from (select * from X) as s left join (select * from Y) as j on s.id = j.id order by s.id, s.x_a, s.x_b, j.id, j.y_a, j.y_b;
select 'left expr';
select X.*, Y.* from X left join Y on (X.id + 1) = (Y.id + 1) order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b;
select 'right';
select X.*, Y.* from X right join Y on X.id = Y.id order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b;
select 'right subs';
select s.*, j.* from (select * from X) as s right join (select * from Y) as j on s.id = j.id order by s.id, s.x_a, s.x_b, j.id, j.y_a, j.y_b;
select 'full';
select X.*, Y.* from X full join Y on X.id = Y.id order by X.id, X.x_a, X.x_b, Y.id, Y.y_a, Y.y_b;
select 'full subs';
select s.*, j.* from (select * from X) as s full join (select * from Y) as j on s.id = j.id order by s.id, s.x_a, s.x_b, j.id, j.y_a, j.y_b;
select 'self inner';
select X.*, s.* from X inner join (select * from X) as s on X.id = s.id order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self inner nullable';
select X.*, s.* from X inner join (select * from X) as s on X.x_b = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self inner nullable vs not nullable';
select X.*, s.* from X inner join (select * from X) as s on X.id = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self inner nullable vs not nullable 2';
select Y.*, s.* from Y inner join (select * from Y) as s on concat('n', Y.y_a) = s.y_b order by Y.id, Y.y_a, Y.y_b, s.id, s.y_a, s.y_b;
select 'self left';
select X.*, s.* from X left join (select * from X) as s on X.id = s.id order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self left nullable';
select X.*, s.* from X left join (select * from X) as s on X.x_b = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self left nullable vs not nullable';
select X.*, s.* from X left join (select * from X) as s on X.id = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self left nullable vs not nullable 2';
select Y.*, s.* from Y left join (select * from Y) as s on concat('n', Y.y_a) = s.y_b order by Y.id, Y.y_a, Y.y_b, s.id, s.y_a, s.y_b;
select 'self right';
select X.*, s.* from X right join (select * from X) as s on X.id = s.id order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self right nullable';
select X.*, s.* from X right join (select * from X) as s on X.x_b = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self right nullable vs not nullable';
select X.*, s.* from X right join (select * from X) as s on X.id = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self full';
select X.*, s.* from X full join (select * from X) as s on X.id = s.id order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self full nullable';
select X.*, s.* from X full join (select * from X) as s on X.x_b = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
select 'self full nullable vs not nullable';
select X.*, s.* from X full join (select * from X) as s on X.id = s.x_b order by X.id, X.x_a, X.x_b, s.id, s.x_a, s.x_b;
drop table X sync;
drop table Y sync;

View File

@ -0,0 +1,16 @@
semi left
2 a3 2 b1
2 a6 2 b1
4 a5 4 b3
semi right
2 a3 2 b1
2 a3 2 b2
4 a5 4 b3
4 a5 4 b4
4 a5 4 b5
anti left
0 a1 0
1 a2 1
3 a4 3
anti right
0 5 b6

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS t1 SYNC;
DROP TABLE IF EXISTS t2 SYNC;
CREATE TABLE t1 (x UInt32, s String) engine ReplicatedMergeTree('/clickhouse/{database}/t1', '1') order by tuple();
CREATE TABLE t2 (x UInt32, s String) engine ReplicatedMergeTree('/clickhouse/{database}/t2', '1') order by tuple();
INSERT INTO t1 (x, s) VALUES (0, 'a1'), (1, 'a2'), (2, 'a3'), (3, 'a4'), (4, 'a5'), (2, 'a6');
INSERT INTO t2 (x, s) VALUES (2, 'b1'), (2, 'b2'), (4, 'b3'), (4, 'b4'), (4, 'b5'), (5, 'b6');
SET join_use_nulls = 0;
set enable_analyzer = 1, enable_parallel_replicas = 1, max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost';
SELECT 'semi left';
SELECT t1.*, t2.* FROM t1 SEMI LEFT JOIN t2 USING(x) ORDER BY t1.x, t2.x, t1.s, t2.s;
SELECT 'semi right';
SELECT t1.*, t2.* FROM t1 SEMI RIGHT JOIN t2 USING(x) ORDER BY t1.x, t2.x, t1.s, t2.s;
SELECT 'anti left';
SELECT t1.*, t2.* FROM t1 ANTI LEFT JOIN t2 USING(x) ORDER BY t1.x, t2.x, t1.s, t2.s;
SELECT 'anti right';
SELECT t1.*, t2.* FROM t1 ANTI RIGHT JOIN t2 USING(x) ORDER BY t1.x, t2.x, t1.s, t2.s;
DROP TABLE t1 SYNC;
DROP TABLE t2 SYNC;