Merge pull request #58916 from ClickHouse/allow-parallel-replicas-for-join-with-analyzer-2

Allow parallel replicas for JOIN with analyzer [part 2]
This commit is contained in:
Nikolai Kochetov 2024-02-14 13:30:35 +01:00 committed by GitHub
commit ebf47dd7c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1722 additions and 32 deletions

View File

@ -40,6 +40,8 @@ class IColumn;
M(UInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \
M(UInt64, min_insert_block_size_rows_for_materialized_views, 0, "Like min_insert_block_size_rows, but applied only during pushing to MATERIALIZED VIEW (default: min_insert_block_size_rows)", 0) \
M(UInt64, min_insert_block_size_bytes_for_materialized_views, 0, "Like min_insert_block_size_bytes, but applied only during pushing to MATERIALIZED VIEW (default: min_insert_block_size_bytes)", 0) \
M(UInt64, min_external_table_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to external table to specified size in rows, if blocks are not big enough.", 0) \
M(UInt64, min_external_table_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to external table to specified size in bytes, if blocks are not big enough.", 0) \
M(UInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \
M(UInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \
M(UInt64, max_insert_delayed_streams_for_parallel_write, 0, "The maximum number of streams (columns) to delay final part flush. Default - auto (1000 in case of underlying storage supports parallel write, for example S3 and disabled otherwise)", 0) \
@ -186,6 +188,7 @@ class IColumn;
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
M(Bool, parallel_replicas_prefer_local_join, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN.", 0) \
M(UInt64, parallel_replicas_mark_segment_size, 128, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \

View File

@ -96,6 +96,9 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"split_parts_ranges_into_intersecting_and_non_intersecting_final", true, true, "Allow to split parts ranges into intersecting and non intersecting during FINAL optimization"},
{"split_intersecting_parts_ranges_into_layers_final", true, true, "Allow to split intersecting parts ranges into layers during FINAL optimization"},
{"azure_max_single_part_copy_size", 256*1024*1024, 256*1024*1024, "The maximum size of object to copy using single part copy to Azure blob storage."},
{"min_external_table_block_size_rows", DEFAULT_INSERT_BLOCK_SIZE, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to external table to specified size in rows, if blocks are not big enough"},
{"min_external_table_block_size_bytes", DEFAULT_INSERT_BLOCK_SIZE * 256, DEFAULT_INSERT_BLOCK_SIZE * 256, "Squash blocks passed to external table to specified size in bytes, if blocks are not big enough."},
{"parallel_replicas_prefer_local_join", true, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN."},
{"extract_key_value_pairs_max_pairs_per_row", 0, 0, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory."}}},
{"24.1", {{"print_pretty_type_names", false, true, "Better user experience."},
{"input_format_json_read_bools_as_strings", false, true, "Allow to read bools as strings in JSON formats by default"},

View File

@ -64,6 +64,7 @@
#include <Analyzer/AggregationUtils.h>
#include <Analyzer/WindowFunctionsUtils.h>
#include <Planner/findQueryForParallelReplicas.h>
#include <Planner/Utils.h>
#include <Planner/PlannerContext.h>
#include <Planner/PlannerActionsVisitor.h>
@ -1057,7 +1058,7 @@ void addBuildSubqueriesForSetsStepIfNeeded(
Planner subquery_planner(
query_tree,
subquery_options,
std::make_shared<GlobalPlannerContext>()); //planner_context->getGlobalPlannerContext());
std::make_shared<GlobalPlannerContext>(nullptr, nullptr));
subquery_planner.buildQueryPlanIfNeeded();
subquery->setQueryPlan(std::make_unique<QueryPlan>(std::move(subquery_planner).extractQueryPlan()));
@ -1160,7 +1161,10 @@ Planner::Planner(const QueryTreeNodePtr & query_tree_,
SelectQueryOptions & select_query_options_)
: query_tree(query_tree_)
, select_query_options(select_query_options_)
, planner_context(buildPlannerContext(query_tree, select_query_options, std::make_shared<GlobalPlannerContext>()))
, planner_context(buildPlannerContext(query_tree, select_query_options,
std::make_shared<GlobalPlannerContext>(
findQueryForParallelReplicas(query_tree, select_query_options),
findTableForParallelReplicas(query_tree, select_query_options))))
{
}
@ -1223,6 +1227,8 @@ void Planner::buildPlanForUnionNode()
query_planner.buildQueryPlanIfNeeded();
for (const auto & row_policy : query_planner.getUsedRowPolicies())
used_row_policies.insert(row_policy);
const auto & mapping = query_planner.getQueryNodeToPlanStepMapping();
query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
auto query_node_plan = std::make_unique<QueryPlan>(std::move(query_planner).extractQueryPlan());
query_plans_headers.push_back(query_node_plan->getCurrentDataStream().header);
query_plans.push_back(std::move(query_node_plan));
@ -1402,16 +1408,27 @@ void Planner::buildPlanForQueryNode()
}
}
JoinTreeQueryPlan join_tree_query_plan;
if (planner_context->getMutableQueryContext()->canUseTaskBasedParallelReplicas()
&& planner_context->getGlobalPlannerContext()->parallel_replicas_node == &query_node)
{
join_tree_query_plan = buildQueryPlanForParallelReplicas(query_node, planner_context, select_query_info.storage_limits);
}
else
{
auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
auto join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
join_tree_query_plan = buildJoinTreeQueryPlan(query_tree,
select_query_info,
select_query_options,
top_level_identifiers,
planner_context);
}
auto from_stage = join_tree_query_plan.from_stage;
query_plan = std::move(join_tree_query_plan.query_plan);
used_row_policies = std::move(join_tree_query_plan.used_row_policies);
auto & mapping = join_tree_query_plan.query_node_to_plan_step_mapping;
query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
LOG_TRACE(getLogger("Planner"), "Query {} from stage {} to stage {}{}",
query_tree->formatConvertedASTForErrorMessage(),
@ -1681,6 +1698,8 @@ void Planner::buildPlanForQueryNode()
if (!select_query_options.only_analyze)
addBuildSubqueriesForSetsStepIfNeeded(query_plan, select_query_options, planner_context, result_actions_to_execute);
query_node_to_plan_step_mapping[&query_node] = query_plan.getRootNode();
}
SelectQueryInfo Planner::buildSelectQueryInfo() const

View File

@ -65,6 +65,11 @@ public:
return planner_context;
}
/// We support mapping QueryNode -> QueryPlanStep (the last step added to plan from this query)
/// It is useful for parallel replicas analysis.
using QueryNodeToPlanStepMapping = std::unordered_map<const QueryNode *, const QueryPlan::Node *>;
const QueryNodeToPlanStepMapping & getQueryNodeToPlanStepMapping() const { return query_node_to_plan_step_mapping; }
private:
void buildPlanForUnionNode();
@ -76,6 +81,7 @@ private:
QueryPlan query_plan;
StorageLimitsList storage_limits;
std::set<std::string> used_row_policies;
QueryNodeToPlanStepMapping query_node_to_plan_step_mapping;
};
}

View File

@ -18,10 +18,18 @@ namespace DB
*
* 1. Column identifiers.
*/
class QueryNode;
class TableNode;
class GlobalPlannerContext
{
public:
GlobalPlannerContext() = default;
explicit GlobalPlannerContext(const QueryNode * parallel_replicas_node_, const TableNode * parallel_replicas_table_)
: parallel_replicas_node(parallel_replicas_node_)
, parallel_replicas_table(parallel_replicas_table_)
{
}
/** Create column identifier for column node.
*
@ -38,6 +46,13 @@ public:
/// Check if context has column identifier
bool hasColumnIdentifier(const ColumnIdentifier & column_identifier);
/// The query which will be executed with parallel replicas.
/// In case if only the most inner subquery can be executed with parallel replicas, node is nullptr.
const QueryNode * const parallel_replicas_node = nullptr;
/// Table which is used with parallel replicas reading. Now, only one table is supported by the protocol.
/// It is the left-most table of the query (in JOINs, UNIONs and subqueries).
const TableNode * const parallel_replicas_table = nullptr;
private:
std::unordered_set<ColumnIdentifier> column_identifiers;
};

View File

@ -613,6 +613,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
auto * union_node = table_expression->as<UnionNode>();
QueryPlan query_plan;
std::unordered_map<const QueryNode *, const QueryPlan::Node *> query_node_to_plan_step_mapping;
std::set<std::string> used_row_policies;
if (table_node || table_function_node)
@ -623,6 +624,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
auto table_expression_query_info = select_query_info;
table_expression_query_info.table_expression = table_expression;
table_expression_query_info.filter_actions_dag = table_expression_data.getFilterActions();
table_expression_query_info.analyzer_can_use_parallel_replicas_on_follower = table_node == planner_context->getGlobalPlannerContext()->parallel_replicas_table;
size_t max_streams = settings.max_threads;
size_t max_threads_execute_query = settings.max_threads;
@ -915,6 +917,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// Propagate storage limits to subquery
subquery_planner.addStorageLimits(*select_query_info.storage_limits);
subquery_planner.buildQueryPlanIfNeeded();
const auto & mapping = subquery_planner.getQueryNodeToPlanStepMapping();
query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
query_plan = std::move(subquery_planner).extractQueryPlan();
}
}
@ -974,6 +978,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
.query_plan = std::move(query_plan),
.from_stage = from_stage,
.used_row_policies = std::move(used_row_policies),
.query_node_to_plan_step_mapping = std::move(query_node_to_plan_step_mapping),
};
}
@ -1520,11 +1525,16 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
if (join_clauses_and_actions.right_join_expressions_actions)
left_join_tree_query_plan.actions_dags.emplace_back(std::move(join_clauses_and_actions.right_join_expressions_actions));
auto mapping = std::move(left_join_tree_query_plan.query_node_to_plan_step_mapping);
auto & r_mapping = right_join_tree_query_plan.query_node_to_plan_step_mapping;
mapping.insert(r_mapping.begin(), r_mapping.end());
return JoinTreeQueryPlan{
.query_plan = std::move(result_plan),
.from_stage = QueryProcessingStage::FetchColumns,
.used_row_policies = std::move(left_join_tree_query_plan.used_row_policies),
.actions_dags = std::move(left_join_tree_query_plan.actions_dags),
.query_node_to_plan_step_mapping = std::move(mapping),
};
}
@ -1611,6 +1621,7 @@ JoinTreeQueryPlan buildQueryPlanForArrayJoinNode(const QueryTreeNodePtr & array_
.from_stage = QueryProcessingStage::FetchColumns,
.used_row_policies = std::move(join_tree_query_plan.used_row_policies),
.actions_dags = std::move(join_tree_query_plan.actions_dags),
.query_node_to_plan_step_mapping = std::move(join_tree_query_plan.query_node_to_plan_step_mapping),
};
}

View File

@ -17,6 +17,7 @@ struct JoinTreeQueryPlan
QueryProcessingStage::Enum from_stage;
std::set<std::string> used_row_policies;
std::vector<ActionsDAGPtr> actions_dags;
std::unordered_map<const QueryNode *, const QueryPlan::Node *> query_node_to_plan_step_mapping;
};
/// Build JOIN TREE query plan for query node

View File

@ -37,6 +37,8 @@
#include <Planner/CollectTableExpressionData.h>
#include <Planner/CollectSets.h>
#include <stack>
namespace DB
{
@ -130,6 +132,34 @@ ASTPtr queryNodeToSelectQuery(const QueryTreeNodePtr & query_node)
return result_ast;
}
static void removeCTEs(ASTPtr & ast)
{
std::stack<IAST *> stack;
stack.push(ast.get());
while (!stack.empty())
{
auto * node = stack.top();
stack.pop();
if (auto * subquery = typeid_cast<ASTSubquery *>(node))
subquery->cte_name = {};
for (const auto & child : node->children)
stack.push(child.get());
}
}
ASTPtr queryNodeToDistributedSelectQuery(const QueryTreeNodePtr & query_node)
{
auto ast = queryNodeToSelectQuery(query_node);
/// Remove CTEs information from distributed queries.
/// Now, if cte_name is set for subquery node, AST -> String serialization will only print cte name.
/// But CTE is defined only for top-level query part, so may not be sent.
/// Removing cte_name forces subquery to be always printed.
removeCTEs(ast);
return ast;
}
/** There are no limits on the maximum size of the result for the subquery.
* Since the result of the query is not the result of the entire query.
*/

View File

@ -34,6 +34,9 @@ Block buildCommonHeaderForUnion(const Blocks & queries_headers, SelectUnionMode
/// Convert query node to ASTSelectQuery
ASTPtr queryNodeToSelectQuery(const QueryTreeNodePtr & query_node);
/// Convert query node to ASTSelectQuery for distributed processing
ASTPtr queryNodeToDistributedSelectQuery(const QueryTreeNodePtr & query_node);
/// Build context for subquery execution
ContextPtr buildSubqueryContext(const ContextPtr & context);

View File

@ -0,0 +1,438 @@
#include <Planner/findQueryForParallelReplicas.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Planner/PlannerJoinTree.h>
#include <Planner/Utils.h>
#include <Analyzer/ArrayJoinNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/UnionNode.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/queryToString.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageDummy.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNSUPPORTED_METHOD;
}
/// Returns a list of (sub)queries (candidates) which may support parallel replicas.
/// The rule is :
/// subquery has only LEFT or ALL INNER JOIN (or none), and left part is MergeTree table or subquery candidate as well.
///
/// Additional checks are required, so we return many candidates. The innermost subquery is on top.
std::stack<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTreeNode * query_tree_node)
{
std::stack<const QueryNode *> res;
while (query_tree_node)
{
auto join_tree_node_type = query_tree_node->getNodeType();
switch (join_tree_node_type)
{
case QueryTreeNodeType::TABLE:
{
const auto & table_node = query_tree_node->as<TableNode &>();
const auto & storage = table_node.getStorage();
/// Here we check StorageDummy as well, to support a query tree with replaced storages.
if (std::dynamic_pointer_cast<MergeTreeData>(storage) || typeid_cast<const StorageDummy *>(storage.get()))
return res;
return {};
}
case QueryTreeNodeType::TABLE_FUNCTION:
{
return {};
}
case QueryTreeNodeType::QUERY:
{
const auto & query_node_to_process = query_tree_node->as<QueryNode &>();
query_tree_node = query_node_to_process.getJoinTree().get();
res.push(&query_node_to_process);
break;
}
case QueryTreeNodeType::UNION:
{
const auto & union_node = query_tree_node->as<UnionNode &>();
const auto & union_queries = union_node.getQueries().getNodes();
if (union_queries.empty())
return {};
query_tree_node = union_queries.front().get();
break;
}
case QueryTreeNodeType::ARRAY_JOIN:
{
const auto & array_join_node = query_tree_node->as<ArrayJoinNode &>();
query_tree_node = array_join_node.getTableExpression().get();
break;
}
case QueryTreeNodeType::JOIN:
{
const auto & join_node = query_tree_node->as<JoinNode &>();
auto join_kind = join_node.getKind();
auto join_strictness = join_node.getStrictness();
bool can_parallelize_join =
join_kind == JoinKind::Left
|| (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All);
if (!can_parallelize_join)
return {};
query_tree_node = join_node.getLeftTableExpression().get();
break;
}
default:
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unexpected node type for table expression. "
"Expected table, table function, query, union, join or array join. Actual {}",
query_tree_node->getNodeTypeName());
}
}
}
return res;
}
class ReplaceTableNodeToDummyVisitor : public InDepthQueryTreeVisitor<ReplaceTableNodeToDummyVisitor, true>
{
public:
using Base = InDepthQueryTreeVisitor<ReplaceTableNodeToDummyVisitor, true>;
using Base::Base;
void visitImpl(const QueryTreeNodePtr & node)
{
auto * table_node = node->as<TableNode>();
auto * table_function_node = node->as<TableFunctionNode>();
if (table_node || table_function_node)
{
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
auto storage_dummy
= std::make_shared<StorageDummy>(storage_snapshot->storage.getStorageID(), ColumnsDescription(storage_snapshot->getColumns(get_column_options)));
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
dummy_table_node->setAlias(node->getAlias());
replacement_map.emplace(node.get(), std::move(dummy_table_node));
}
}
ContextPtr context;
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
};
QueryTreeNodePtr replaceTablesWithDummyTables(const QueryTreeNodePtr & query, const ContextPtr & context)
{
ReplaceTableNodeToDummyVisitor visitor;
visitor.context = context;
visitor.visit(query);
return query->cloneAndReplace(visitor.replacement_map);
}
/// Find the best candidate for parallel replicas execution by verifying query plan.
/// If query plan has only Expression, Filter of Join steps, we can execute it fully remotely and check the next query.
/// Otherwise we can execute current query up to WithMergableStage only.
const QueryNode * findQueryForParallelReplicas(
std::stack<const QueryNode *> stack,
const std::unordered_map<const QueryNode *, const QueryPlan::Node *> & mapping)
{
const QueryPlan::Node * prev_checked_node = nullptr;
const QueryNode * res = nullptr;
while (!stack.empty())
{
const QueryNode * subquery_node = stack.top();
stack.pop();
auto it = mapping.find(subquery_node);
/// This should not happen ideally.
if (it == mapping.end())
break;
const QueryPlan::Node * curr_node = it->second;
const QueryPlan::Node * next_node_to_check = curr_node;
bool can_distribute_full_node = true;
while (next_node_to_check && next_node_to_check != prev_checked_node)
{
const auto & children = next_node_to_check->children;
auto * step = next_node_to_check->step.get();
if (children.empty())
{
/// Found a source step. This should be possible only in the first iteration.
if (prev_checked_node)
return nullptr;
next_node_to_check = nullptr;
}
else if (children.size() == 1)
{
const auto * expression = typeid_cast<ExpressionStep *>(step);
const auto * filter = typeid_cast<FilterStep *>(step);
if (!expression && !filter)
can_distribute_full_node = false;
next_node_to_check = children.front();
}
else
{
const auto * join = typeid_cast<JoinStep *>(step);
/// We've checked that JOIN is INNER/LEFT in query tree.
/// Don't distribute UNION node.
if (!join)
return res;
next_node_to_check = children.front();
}
}
/// Current node contains steps like GROUP BY / DISTINCT
/// Will try to execute query up to WithMergableStage
if (!can_distribute_full_node)
{
/// Current query node does not contain subqueries.
/// We can execute parallel replicas over storage::read.
if (!res)
return nullptr;
return subquery_node;
}
/// Query is simple enough to be fully distributed.
res = subquery_node;
prev_checked_node = curr_node;
}
return res;
}
const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
{
if (select_query_options.only_analyze)
return nullptr;
auto * query_node = query_tree_node->as<QueryNode>();
auto * union_node = query_tree_node->as<UnionNode>();
if (!query_node && !union_node)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Expected QUERY or UNION node. Actual {}",
query_tree_node->formatASTForErrorMessage());
auto context = query_node ? query_node->getContext() : union_node->getContext();
if (!context->canUseParallelReplicasOnInitiator())
return nullptr;
auto stack = getSupportingParallelReplicasQuery(query_tree_node.get());
/// Empty stack means that storage does not support parallel replicas.
if (stack.empty())
return nullptr;
/// We don't have any subquery and storage can process parallel replicas by itself.
if (stack.top() == query_tree_node.get())
return nullptr;
/// This is needed to avoid infinite recursion.
auto mutable_context = Context::createCopy(context);
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
/// Here we replace tables to dummy, in order to build a temporary query plan for parallel replicas analysis.
ResultReplacementMap replacement_map;
auto updated_query_tree = replaceTablesWithDummyTables(query_tree_node, mutable_context);
SelectQueryOptions options;
Planner planner(updated_query_tree, options, std::make_shared<GlobalPlannerContext>(nullptr, nullptr));
planner.buildQueryPlanIfNeeded();
/// This part is a bit clumsy.
/// We updated a query_tree with dummy storages, and mapping is using updated_query_tree now.
/// But QueryNode result should be taken from initial query tree.
/// So that we build a list of candidates again, and call findQueryForParallelReplicas for it.
auto new_stack = getSupportingParallelReplicasQuery(updated_query_tree.get());
const auto & mapping = planner.getQueryNodeToPlanStepMapping();
const auto * res = findQueryForParallelReplicas(new_stack, mapping);
/// Now, return a query from initial stack.
if (res)
{
while (!new_stack.empty())
{
if (res == new_stack.top())
return stack.top();
stack.pop();
new_stack.pop();
}
}
return res;
}
static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * query_tree_node)
{
std::stack<const IQueryTreeNode *> right_join_nodes;
while (query_tree_node || !right_join_nodes.empty())
{
if (!query_tree_node)
{
query_tree_node = right_join_nodes.top();
right_join_nodes.pop();
}
auto join_tree_node_type = query_tree_node->getNodeType();
switch (join_tree_node_type)
{
case QueryTreeNodeType::TABLE:
{
const auto & table_node = query_tree_node->as<TableNode &>();
const auto & storage = table_node.getStorage();
if (std::dynamic_pointer_cast<MergeTreeData>(storage) || typeid_cast<const StorageDummy *>(storage.get()))
return &table_node;
query_tree_node = nullptr;
break;
}
case QueryTreeNodeType::TABLE_FUNCTION:
{
query_tree_node = nullptr;
break;
}
case QueryTreeNodeType::QUERY:
{
const auto & query_node_to_process = query_tree_node->as<QueryNode &>();
query_tree_node = query_node_to_process.getJoinTree().get();
break;
}
case QueryTreeNodeType::UNION:
{
const auto & union_node = query_tree_node->as<UnionNode &>();
const auto & union_queries = union_node.getQueries().getNodes();
query_tree_node = nullptr;
if (!union_queries.empty())
query_tree_node = union_queries.front().get();
break;
}
case QueryTreeNodeType::ARRAY_JOIN:
{
const auto & array_join_node = query_tree_node->as<ArrayJoinNode &>();
query_tree_node = array_join_node.getTableExpression().get();
break;
}
case QueryTreeNodeType::JOIN:
{
const auto & join_node = query_tree_node->as<JoinNode &>();
query_tree_node = join_node.getLeftTableExpression().get();
right_join_nodes.push(join_node.getRightTableExpression().get());
break;
}
default:
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unexpected node type for table expression. "
"Expected table, table function, query, union, join or array join. Actual {}",
query_tree_node->getNodeTypeName());
}
}
}
return nullptr;
}
const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options)
{
if (select_query_options.only_analyze)
return nullptr;
auto * query_node = query_tree_node->as<QueryNode>();
auto * union_node = query_tree_node->as<UnionNode>();
if (!query_node && !union_node)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Expected QUERY or UNION node. Actual {}",
query_tree_node->formatASTForErrorMessage());
auto context = query_node ? query_node->getContext() : union_node->getContext();
if (!context->canUseParallelReplicasOnFollower())
return nullptr;
return findTableForParallelReplicas(query_tree_node.get());
}
JoinTreeQueryPlan buildQueryPlanForParallelReplicas(
const QueryNode & query_node,
const PlannerContextPtr & planner_context,
std::shared_ptr<const StorageLimitsList> storage_limits)
{
auto processed_stage = QueryProcessingStage::WithMergeableState;
auto context = planner_context->getQueryContext();
QueryTreeNodePtr modified_query_tree = query_node.clone();
Block initial_header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, context, SelectQueryOptions(processed_stage).analyze());
rewriteJoinToGlobalJoin(modified_query_tree, context);
modified_query_tree = buildQueryTreeForShard(planner_context, modified_query_tree);
ASTPtr modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
Block header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, context, SelectQueryOptions(processed_stage).analyze());
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,
{},
{},
processed_stage);
QueryPlan query_plan;
ClusterProxy::executeQueryWithParallelReplicas(
query_plan,
select_stream_factory,
modified_query_ast,
context,
storage_limits);
auto converting = ActionsDAG::makeConvertingActions(
header.getColumnsWithTypeAndName(),
initial_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
/// initial_header is a header expected by initial query.
/// header is a header which is returned by the follower.
/// They are different because tables will have different aliases (e.g. _table1 or _table5).
/// Here we just rename columns by position, with the hope the types would match.
auto step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(converting));
step->setStepDescription("Convert distributed names");
query_plan.addStep(std::move(step));
return {std::move(query_plan), std::move(processed_stage), {}, {}, {}};
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <list>
#include <memory>
namespace DB
{
class QueryNode;
class TableNode;
class IQueryTreeNode;
using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
struct SelectQueryOptions;
/// Find a qury which can be executed with parallel replicas up to WithMergableStage.
/// Returned query will always contain some (>1) subqueries, possibly with joins.
const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options);
/// Find a table from which we should read on follower replica. It's the left-most table within all JOINs and UNIONs.
const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options);
struct JoinTreeQueryPlan;
class PlannerContext;
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
struct StorageLimits;
using StorageLimitsList = std::list<StorageLimits>;
/// Execute QueryNode with parallel replicas up to WithMergableStage and return a plan.
/// This method does not check that QueryNode is valid. Ideally it should be a result of findParallelReplicasQuery.
JoinTreeQueryPlan buildQueryPlanForParallelReplicas(
const QueryNode & query_node,
const PlannerContextPtr & planner_context,
std::shared_ptr<const StorageLimitsList> storage_limits);
}

View File

@ -161,6 +161,8 @@ struct SelectQueryInfo
/// It's guaranteed to be present in JOIN TREE of `query_tree`
QueryTreeNodePtr table_expression;
bool analyzer_can_use_parallel_replicas_on_follower = false;
/// Table expression modifiers for storage
std::optional<TableExpressionModifiers> table_expression_modifiers;

View File

@ -797,7 +797,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression));
return buildQueryTreeForShard(query_info, query_tree_to_modify);
return buildQueryTreeForShard(query_info.planner_context, query_tree_to_modify);
}
}
@ -831,7 +831,7 @@ void StorageDistributed::read(
*/
for (auto & column : header)
column.column = column.column->convertToFullColumnIfConst();
query_ast = queryNodeToSelectQuery(query_tree_distributed);
query_ast = queryNodeToDistributedSelectQuery(query_tree_distributed);
}
else
{

View File

@ -221,11 +221,11 @@ void StorageMergeTree::read(
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
QueryTreeNodePtr modified_query_tree = query_info.query_tree->clone();
rewriteJoinToGlobalJoin(modified_query_tree);
modified_query_tree = buildQueryTreeForShard(query_info, modified_query_tree);
rewriteJoinToGlobalJoin(modified_query_tree, local_context);
modified_query_tree = buildQueryTreeForShard(query_info.planner_context, modified_query_tree);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToSelectQuery(modified_query_tree);
modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
}
else
{
@ -252,7 +252,9 @@ void StorageMergeTree::read(
}
else
{
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree;
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower()
&& local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree
&& (!local_context->getSettingsRef().allow_experimental_analyzer || query_info.analyzer_can_use_parallel_replicas_on_follower);
if (auto plan = reader.read(
column_names,

View File

@ -5368,12 +5368,12 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
QueryTreeNodePtr modified_query_tree = query_info.query_tree->clone();
rewriteJoinToGlobalJoin(modified_query_tree);
modified_query_tree = buildQueryTreeForShard(query_info, modified_query_tree);
rewriteJoinToGlobalJoin(modified_query_tree, local_context);
modified_query_tree = buildQueryTreeForShard(query_info.planner_context, modified_query_tree);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToSelectQuery(modified_query_tree);
modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
}
else
{
@ -5407,11 +5407,14 @@ void StorageReplicatedMergeTree::readLocalImpl(
const size_t max_block_size,
const size_t num_streams)
{
const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower()
&& (!local_context->getSettingsRef().allow_experimental_analyzer || query_info.analyzer_can_use_parallel_replicas_on_follower);
auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
/* max_block_numbers_to_read= */ nullptr,
/* enable_parallel_reading= */ local_context->canUseParallelReplicasOnFollower());
enable_parallel_reading);
if (plan)
query_plan = std::move(*plan);

View File

@ -16,6 +16,7 @@
#include <Storages/StorageDummy.h>
#include <Planner/Utils.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -283,7 +284,16 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(mutable_context);
auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(mutable_context);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings)));
auto builder = query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings);
size_t min_block_size_rows = mutable_context->getSettingsRef().min_external_table_block_size_rows;
size_t min_block_size_bytes = mutable_context->getSettingsRef().min_external_table_block_size_bytes;
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(builder->getHeader(), min_block_size_rows, min_block_size_bytes);
builder->resize(1);
builder->addTransform(std::move(squashing));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
pipeline.complete(std::move(table_out));
CompletedPipelineExecutor executor(pipeline);
@ -295,10 +305,8 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
}
QueryTreeNodePtr buildQueryTreeForShard(SelectQueryInfo & query_info, QueryTreeNodePtr query_tree_to_modify)
QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify)
{
auto & planner_context = query_info.planner_context;
CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor;
collect_column_source_to_columns_visitor.visit(query_tree_to_modify);
@ -378,17 +386,48 @@ QueryTreeNodePtr buildQueryTreeForShard(SelectQueryInfo & query_info, QueryTreeN
return query_tree_to_modify;
}
class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitor<RewriteJoinToGlobalJoinVisitor>
class CollectStoragesVisitor : public InDepthQueryTreeVisitor<CollectStoragesVisitor>
{
public:
using Base = InDepthQueryTreeVisitor<RewriteJoinToGlobalJoinVisitor>;
using Base = InDepthQueryTreeVisitor<CollectStoragesVisitor>;
using Base::Base;
void visitImpl(QueryTreeNodePtr & node)
{
if (auto * table_node = node->as<TableNode>())
storages.push_back(table_node->getStorage());
}
std::vector<StoragePtr> storages;
};
class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext<RewriteJoinToGlobalJoinVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<RewriteJoinToGlobalJoinVisitor>;
using Base::Base;
static bool allStoragesAreMergeTree(QueryTreeNodePtr & node)
{
CollectStoragesVisitor collect_storages;
collect_storages.visit(node);
for (const auto & storage : collect_storages.storages)
if (!storage->isMergeTree())
return false;
return true;
}
void enterImpl(QueryTreeNodePtr & node)
{
if (auto * join_node = node->as<JoinNode>())
{
bool prefer_local_join = getContext()->getSettingsRef().parallel_replicas_prefer_local_join;
bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression());
if (should_use_global_join)
join_node->setLocality(JoinLocality::Global);
}
}
static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr & child)
{
@ -400,9 +439,9 @@ public:
}
};
void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify)
void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context)
{
RewriteJoinToGlobalJoinVisitor visitor;
RewriteJoinToGlobalJoinVisitor visitor(context);
visitor.visit(query_tree_to_modify);
}

View File

@ -10,8 +10,14 @@ struct SelectQueryInfo;
class IQueryTreeNode;
using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
QueryTreeNodePtr buildQueryTreeForShard(SelectQueryInfo & query_info, QueryTreeNodePtr query_tree_to_modify);
class PlannerContext;
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify);
class Context;
using ContextPtr = std::shared_ptr<const Context>;
QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify);
void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context);
}

View File

@ -58,8 +58,7 @@ U c 10
UlI+1 10
bX?}ix [ Ny]2 G 10
t<iT X48q:Z]t0 10
0 3 SELECT `__table1`.`key` AS `key`, `__table1`.`value1` AS `value1`, `__table1`.`value2` AS `value2` FROM `default`.`join_inner_table` AS `__table1` PREWHERE (`__table1`.`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`__table1`.`number` > _CAST(1610517366120, \'UInt64\')) GROUP BY `__table1`.`key`, `__table1`.`value1`, `__table1`.`value2`
0 3 SELECT `__table2`.`value1` AS `value1`, `__table2`.`value2` AS `value2`, count() AS `count` FROM `default`.`join_outer_table` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` USING (`key`) GROUP BY `__table1`.`key`, `__table2`.`value1`, `__table2`.`value2`
0 3 SELECT `__table2`.`value1` AS `value1`, `__table2`.`value2` AS `value2`, count() AS `count` FROM `default`.`join_outer_table` AS `__table1` ALL INNER JOIN (SELECT `__table3`.`key` AS `key`, `__table3`.`value1` AS `value1`, `__table3`.`value2` AS `value2` FROM `default`.`join_inner_table` AS `__table3` PREWHERE (`__table3`.`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`__table3`.`number` > _CAST(1610517366120, \'UInt64\')) GROUP BY `__table3`.`key`, `__table3`.`value1`, `__table3`.`value2`) AS `__table2` USING (`key`) GROUP BY `__table1`.`key`, `__table2`.`value1`, `__table2`.`value2`
0 3 SELECT `key`, `value1`, `value2` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2`
0 3 SELECT `value1`, `value2`, count() AS `count` FROM `default`.`join_outer_table` ALL INNER JOIN `_data_` USING (`key`) GROUP BY `key`, `value1`, `value2`
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1, allow_experimental_analyzer=0;

View File

@ -64,6 +64,7 @@ function run_query_with_pure_parallel_replicas () {
--query_id "${1}_pure" \
--max_parallel_replicas 3 \
--prefer_localhost_replica 1 \
--parallel_replicas_prefer_local_join 0 \
--cluster_for_parallel_replicas "parallel_replicas" \
--allow_experimental_parallel_reading_from_replicas 1 \
--parallel_replicas_for_non_replicated_merge_tree 1 \

View File

@ -0,0 +1,177 @@
simple join with analyzer
4200000 4200000 4200000 -1400000
4200006 4200006 4200006 -1400002
4200012 4200012 4200012 -1400004
4200018 4200018 4200018 -1400006
4200024 4200024 4200024 -1400008
4200030 4200030 4200030 -1400010
4200036 4200036 4200036 -1400012
4200042 4200042 4200042 -1400014
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
simple (global) join with analyzer and parallel replicas
4200000 4200000 4200000 -1400000
4200006 4200006 4200006 -1400002
4200012 4200012 4200012 -1400004
4200018 4200018 4200018 -1400006
4200024 4200024 4200024 -1400008
4200030 4200030 4200030 -1400010
4200036 4200036 4200036 -1400012
4200042 4200042 4200042 -1400014
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, allow_experimental_parallel_reading_from_replicas = 2, send_logs_level = 'trace', max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, allow_experimental_parallel_reading_from_replicas = 2, send_logs_level = 'trace', max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
simple (local) join with analyzer and parallel replicas
4200000 4200000 4200000 -1400000
4200006 4200006 4200006 -1400002
4200012 4200012 4200012 -1400004
4200018 4200018 4200018 -1400006
4200024 4200024 4200024 -1400008
4200030 4200030 4200030 -1400010
4200036 4200036 4200036 -1400012
4200042 4200042 4200042 -1400014
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
simple (local) join with analyzer and parallel replicas and full sorting merge join
4200000 4200000 4200000 -1400000
4200006 4200006 4200006 -1400002
4200012 4200012 4200012 -1400004
4200018 4200018 4200018 -1400006
4200024 4200024 4200024 -1400008
4200030 4200030 4200030 -1400010
4200036 4200036 4200036 -1400012
4200042 4200042 4200042 -1400014
4200048 4200048 4200048 -1400016
4200054 4200054 4200054 -1400018
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, join_algorithm = 'full_sorting_merge', send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4`) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(700000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, join_algorithm = 'full_sorting_merge', send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer
420000 420000 420000 -140000
420042 420042 420042 -140014
420084 420084 420084 -140028
420126 420126 420126 -140042
420168 420168 420168 -140056
420210 420210 420210 -140070
420252 420252 420252 -140084
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
nested join with analyzer and parallel replicas, both local
420000 420000 420000 -140000
420042 420042 420042 -140014
420084 420084 420084 -140028
420126 420126 420126 -140042
420168 420168 420168 -140056
420210 420210 420210 -140070
420252 420252 420252 -140084
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4` ALL INNER JOIN (SELECT `__table6`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table6`) AS `__table5` ON `__table4`.`key` = `__table5`.`key` SETTINGS parallel_replicas_prefer_local_join = 1) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, join_algorithm = 'full_sorting_merge', send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` ALL INNER JOIN (SELECT `__table4`.`key` AS `key`, `__table4`.`value` AS `value` FROM `default`.`num_2` AS `__table4` ALL INNER JOIN (SELECT `__table6`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table6`) AS `__table5` ON `__table4`.`key` = `__table5`.`key` SETTINGS parallel_replicas_prefer_local_join = 1) AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, join_algorithm = 'full_sorting_merge', send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer and parallel replicas, both global
420000 420000 420000 -140000
420042 420042 420042 -140014
420084 420084 420084 -140028
420126 420126 420126 -140042
420168 420168 420168 -140056
420210 420210 420210 -140070
420252 420252 420252 -140084
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
nested join with analyzer and parallel replicas, global + local
420000 420000 420000 -140000
420042 420042 420042 -140014
420084 420084 420084 -140028
420126 420126 420126 -140042
420168 420168 420168 -140056
420210 420210 420210 -140070
420252 420252 420252 -140084
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` ALL INNER JOIN (SELECT `__table3`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table3`) AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` ALL INNER JOIN (SELECT `__table3`.`number` * 7 AS `key` FROM numbers(100000.) AS `__table3`) AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS parallel_replicas_prefer_local_join = 1 (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join = 0 (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
nested join with analyzer and parallel replicas, both local, both full sorting merge join
420000 420000 420000 -140000
420042 420042 420042 -140014
420084 420084 420084 -140028
420126 420126 420126 -140042
420168 420168 420168 -140056
420210 420210 420210 -140070
420252 420252 420252 -140084
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer and parallel replicas, both local, both full sorting and hash join
420000 420000 420000 -140000
420042 420042 420042 -140014
420084 420084 420084 -140028
420126 420126 420126 -140042
420168 420168 420168 -140056
420210 420210 420210 -140070
420252 420252 420252 -140084
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'hash' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'hash' (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
nested join with analyzer and parallel replicas, both local, both full sorting and hash join
420000 420000 420000 -140000
420042 420042 420042 -140014
420084 420084 420084 -140028
420126 420126 420126 -140042
420168 420168 420168 -140056
420210 420210 420210 -140070
420252 420252 420252 -140084
420294 420294 420294 -140098
420336 420336 420336 -140112
420378 420378 420378 -140126
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value` FROM `default`.`num_2` AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table2` ON `__table1`.`key` = `__table2`.`key` SETTINGS join_algorithm = 'full_sorting_merge' (stage: WithMergeableState)
<Debug> WithOrderCoordinator: Coordination done
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'hash' (stage: WithMergeableState)
SELECT `__table1`.`key` AS `key`, `__table1`.`value` AS `value`, `__table3`.`key` AS `r.key`, `__table3`.`value` AS `r.value` FROM (SELECT `__table2`.`key` AS `key`, `__table2`.`value` AS `value` FROM `default`.`num_1` AS `__table2`) AS `__table1` GLOBAL ALL INNER JOIN `_data_` AS `__table3` ON `__table1`.`key` = `__table3`.`key` ORDER BY `__table1`.`key` ASC LIMIT _CAST(10000, 'UInt64'), _CAST(10, 'UInt64') SETTINGS allow_experimental_analyzer = 1, parallel_replicas_prefer_local_join = 0, send_logs_level = 'trace', allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm = 'hash' (stage: WithMergeableState)
<Debug> DefaultCoordinator: Coordination done

View File

@ -0,0 +1,263 @@
#!/usr/bin/env bash
# Tags: long, no-random-settings, no-random-merge-tree-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists num_1;
drop table if exists num_2;
create table num_1 (key UInt64, value String) engine = MergeTree order by key;
create table num_2 (key UInt64, value Int64) engine = MergeTree order by key;
insert into num_1 select number * 2, toString(number * 2) from numbers(1e7);
insert into num_2 select number * 3, -number from numbers(1.5e6);
"
##############
echo
echo "simple join with analyzer"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1"
##############
echo
echo "simple (global) join with analyzer and parallel replicas"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, allow_experimental_parallel_reading_from_replicas = 2,
max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=0"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, allow_experimental_parallel_reading_from_replicas = 2, send_logs_level='trace',
max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=0" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
##############
echo
echo "simple (local) join with analyzer and parallel replicas"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
##############
echo
echo "simple (local) join with analyzer and parallel replicas and full sorting merge join"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, join_algorithm='full_sorting_merge',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2) r on l.key = r.key
order by l.key limit 10 offset 700000
SETTINGS allow_experimental_analyzer=1, join_algorithm='full_sorting_merge', send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
##############
echo
echo "nested join with analyzer"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=1) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1"
##############
echo
echo "nested join with analyzer and parallel replicas, both local"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=1) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=1) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, join_algorithm='full_sorting_merge', send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=1" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
##############
echo
echo "nested join with analyzer and parallel replicas, both global"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=0) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=0"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=0) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=0" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
##############
echo
echo "nested join with analyzer and parallel replicas, global + local"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=1) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=0"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings parallel_replicas_prefer_local_join=1) r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_prefer_local_join=0" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
##############
echo
echo "nested join with analyzer and parallel replicas, both local, both full sorting merge join"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings join_algorithm='full_sorting_merge') r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, parallel_replicas_prefer_local_join=0,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm='full_sorting_merge'"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings join_algorithm='full_sorting_merge') r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, parallel_replicas_prefer_local_join=0, send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm='full_sorting_merge'" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
##############
echo
echo "nested join with analyzer and parallel replicas, both local, both full sorting and hash join"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings join_algorithm='hash') r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, parallel_replicas_prefer_local_join=0,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm='full_sorting_merge'"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings join_algorithm='hash') r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, parallel_replicas_prefer_local_join=0, send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm='full_sorting_merge'" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'
##############
echo
echo "nested join with analyzer and parallel replicas, both local, both full sorting and hash join"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings join_algorithm='full_sorting_merge') r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, parallel_replicas_prefer_local_join=0,
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm='hash'"
$CLICKHOUSE_CLIENT -q "
select * from (select key, value from num_1) l
inner join (select key, value from num_2 inner join
(select number * 7 as key from numbers(1e5)) as nn on num_2.key = nn.key settings join_algorithm='full_sorting_merge') r
on l.key = r.key order by l.key limit 10 offset 10000
SETTINGS allow_experimental_analyzer=1, parallel_replicas_prefer_local_join=0, send_logs_level='trace',
allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', join_algorithm='hash'" 2>&1 |
grep "executeQuery\|<Debug>.*Coordinator: Coordination done" |
grep -o "SELECT.*WithMergeableState)\|<Debug>.*Coordinator: Coordination done" |
sed -re 's/_data_[[:digit:]]+_[[:digit:]]+/_data_/g'

View File

@ -0,0 +1,502 @@
-- { echoOn }
set parallel_replicas_prefer_local_join = 0;
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
ReadFromRemoteParallelReplicas
--
-- The same query with cte;
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
ReadFromRemoteParallelReplicas
--
-- GROUP BY should work up to WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
54 54 50 50 12 12 0
64 64 0 0 0 0 1
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
MergingAggregated
Expression
ReadFromRemoteParallelReplicas
--
-- ORDER BY in sub3 : sub1 is fully pushed, sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- ORDER BY in sub1 : sub1 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2 order by y),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2 order by y),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
Join
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- RIGHT JOIN in sub3: sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
6 6 6 6 0 0
8 8 8 8 0 0
10 10 10 10 0 0
12 12 12 12 12 12
14 14 14 14 0 0
4 4 0 0 0 0
3 3 0 0 0 0
5 5 0 0 0 0
1 1 0 0 0 0
7 7 0 0 0 0
9 9 0 0 0 0
15 15 0 0 0 0
11 11 0 0 0 0
13 13 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Join
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- RIGHT JOIN in sub5: sub5 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
0 0 1 1 0 0
0 0 3 3 0 0
0 0 4 4 0 0
0 0 5 5 0 0
0 0 6 6 6 6
0 0 7 7 0 0
0 0 8 8 8 8
0 0 9 9 0 0
0 0 10 10 10 10
0 0 11 11 0 0
12 12 12 12 12 12
0 0 13 13 0 0
0 0 14 14 14 14
0 0 15 15 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;-- { echoOn }
Expression
Sorting
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
set parallel_replicas_prefer_local_join = 1;
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
ReadFromRemoteParallelReplicas
--
-- The same query with cte;
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
ReadFromRemoteParallelReplicas
--
-- GROUP BY should work up to WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
54 54 50 50 12 12 0
64 64 0 0 0 0 1
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
MergingAggregated
Expression
ReadFromRemoteParallelReplicas
--
-- ORDER BY in sub3 : sub1 is fully pushed, sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- ORDER BY in sub1 : sub1 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2 order by y),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
1 1 0 0 0 0
3 3 0 0 0 0
4 4 0 0 0 0
5 5 0 0 0 0
6 6 6 6 0 0
7 7 0 0 0 0
8 8 8 8 0 0
9 9 0 0 0 0
10 10 10 10 0 0
11 11 0 0 0 0
12 12 12 12 12 12
13 13 0 0 0 0
14 14 14 14 0 0
15 15 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2 order by y),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
Join
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- RIGHT JOIN in sub3: sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
6 6 6 6 0 0
8 8 8 8 0 0
10 10 10 10 0 0
12 12 12 12 12 12
14 14 14 14 0 0
4 4 0 0 0 0
3 3 0 0 0 0
5 5 0 0 0 0
1 1 0 0 0 0
7 7 0 0 0 0
9 9 0 0 0 0
15 15 0 0 0 0
11 11 0 0 0 0
13 13 0 0 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Join
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas
--
-- RIGHT JOIN in sub5: sub5 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
0 0 0 0 0 0
0 0 1 1 0 0
0 0 3 3 0 0
0 0 4 4 0 0
0 0 5 5 0 0
0 0 6 6 6 6
0 0 7 7 0 0
0 0 8 8 8 8
0 0 9 9 0 0
0 0 10 10 10 10
0 0 11 11 0 0
12 12 12 12 12 12
0 0 13 13 0 0
0 0 14 14 14 14
0 0 15 15 0 0
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
Expression
Sorting
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
Join
Expression
ReadFromRemoteParallelReplicas
Expression
ReadFromRemoteParallelReplicas

View File

@ -0,0 +1,129 @@
drop table if exists tab1;
drop table if exists tab2;
drop table if exists tab3;
create table tab1 (x UInt32, y UInt32, shard UInt32) engine = MergeTree order by shard;
create table tab2 (y UInt32, z UInt32) engine = MergeTree order by tuple();
create table tab3 (z UInt32, a UInt32) engine = MergeTree order by tuple();
insert into tab1 select number, number, number from numbers(16);
insert into tab2 select number * 2, number * 2 from numbers(8);
insert into tab3 select number * 4, number * 4 from numbers(4);
{% for use_global_in in [0, 1] -%}
-- { echoOn }
set parallel_replicas_prefer_local_join = {{use_global_in}};
-- A query with only INNER/LEFT joins is fully send to replicas. JOIN is executed in GLOBAL mode.
select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
explain description=0 select x, y, r.y, z, rr.z, a from (select l.x, l.y, r.y, r.z as z from (select x, y from tab1 where x != 2) l any left join (select y, z from tab2 where y != 4) r on l.y = r.y) ll any left join (select z, a from tab3 where z != 8) rr on ll.z = rr.z SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
--
-- The same query with cte;
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
--
-- GROUP BY should work up to WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select sum(x), sum(y), sum(r.y), sum(z), sum(rr.z), sum(a), key from sub3 ll any left join sub4 rr on ll.z = rr.z group by x % 2 as key)
select * from sub5 order by key
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
--
-- ORDER BY in sub3 : sub1 is fully pushed, sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y order by l.x),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
--
-- ORDER BY in sub1 : sub1 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2 order by y),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2 order by y),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, y, r.y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5 order by x
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
--
-- RIGHT JOIN in sub3: sub3 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub2 r any right join sub1 l on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select x, l.y, y, z, rr.z, a from sub3 ll any left join sub4 rr on ll.z = rr.z)
select * from sub5
SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
--
-- RIGHT JOIN in sub5: sub5 -> WithMergableStage
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
explain description=0
with sub1 as (select x, y from tab1 where x != 2),
sub2 as (select y, z from tab2 where y != 4),
sub3 as (select l.x, l.y, r.y, r.z as z from sub1 l any left join sub2 r on l.y = r.y),
sub4 as (select z, a from tab3 where z != 8),
sub5 as (select z, a, x, y, r.y, ll.z from sub4 rr any right join sub3 ll on ll.z = rr.z)
select * from sub5 order by x SETTINGS allow_experimental_parallel_reading_from_replicas = 2, max_parallel_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, prefer_localhost_replica = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', allow_experimental_analyzer=1;
{%- endfor %}