Right JOIN with parallel replicas

This commit is contained in:
Igor Nikonov 2024-10-25 20:55:51 +00:00
parent 6f8e953bd9
commit c952d9d815
4 changed files with 86 additions and 34 deletions

View File

@ -477,8 +477,8 @@ void executeQueryWithParallelReplicas(
QueryPlanStepPtr analyzed_read_from_merge_tree)
{
auto logger = getLogger("executeQueryWithParallelReplicas");
LOG_DEBUG(logger, "Executing read from {}, header {}, query ({}), stage {} with parallel replicas",
storage_id.getNameForLogs(), header.dumpStructure(), query_ast->formatForLogging(), processed_stage);
LOG_DEBUG(logger, "Executing read from {}, header {}, query ({}), stage {} with parallel replicas\n{}",
storage_id.getNameForLogs(), header.dumpStructure(), query_ast->formatForLogging(), processed_stage, StackTrace().toString());
const auto & settings = context->getSettingsRef();

View File

@ -665,6 +665,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
bool is_single_table_expression,
bool wrap_read_columns_in_subquery)
{
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "table_expression:\n{}", table_expression->dumpTree());
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();

View File

@ -100,14 +100,19 @@ std::stack<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTre
auto join_kind = join_node.getKind();
auto join_strictness = join_node.getStrictness();
bool can_parallelize_join =
join_kind == JoinKind::Left
|| (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All);
if (!can_parallelize_join)
if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All))
{
query_tree_node = join_node.getLeftTableExpression().get();
}
else if (join_kind == JoinKind::Right)
{
query_tree_node = join_node.getRightTableExpression().get();
}
else
{
return {};
}
query_tree_node = join_node.getLeftTableExpression().get();
break;
}
default:
@ -310,13 +315,15 @@ const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tr
static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * query_tree_node)
{
std::stack<const IQueryTreeNode *> right_join_nodes;
while (query_tree_node || !right_join_nodes.empty())
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "\n{}", StackTrace().toString());
std::stack<const IQueryTreeNode *> join_nodes;
while (query_tree_node || !join_nodes.empty())
{
if (!query_tree_node)
{
query_tree_node = right_join_nodes.top();
right_join_nodes.pop();
query_tree_node = join_nodes.top();
join_nodes.pop();
}
auto join_tree_node_type = query_tree_node->getNodeType();
@ -365,8 +372,23 @@ static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * que
case QueryTreeNodeType::JOIN:
{
const auto & join_node = query_tree_node->as<JoinNode &>();
query_tree_node = join_node.getLeftTableExpression().get();
right_join_nodes.push(join_node.getRightTableExpression().get());
const auto join_kind = join_node.getKind();
const auto join_strictness = join_node.getStrictness();
if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner and join_strictness == JoinStrictness::All))
{
query_tree_node = join_node.getLeftTableExpression().get();
join_nodes.push(join_node.getRightTableExpression().get());
}
else if (join_kind == JoinKind::Right)
{
query_tree_node = join_node.getRightTableExpression().get();
join_nodes.push(join_node.getLeftTableExpression().get());
}
else
{
return nullptr;
}
break;
}
default:
@ -400,7 +422,9 @@ const TableNode * findTableForParallelReplicas(const QueryTreeNodePtr & query_tr
if (!context->canUseParallelReplicasOnFollower())
return nullptr;
return findTableForParallelReplicas(query_tree_node.get());
const auto * res = findTableForParallelReplicas(query_tree_node.get());
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "Table found {}", res->getStorageID().getFullTableName());
return res;
}
JoinTreeQueryPlan buildQueryPlanForParallelReplicas(
@ -408,6 +432,8 @@ JoinTreeQueryPlan buildQueryPlanForParallelReplicas(
const PlannerContextPtr & planner_context,
std::shared_ptr<const StorageLimitsList> storage_limits)
{
LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "\n{}", StackTrace().toString());
auto processed_stage = QueryProcessingStage::WithMergeableState;
auto context = planner_context->getQueryContext();

View File

@ -314,6 +314,35 @@ TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
return temporary_table_expression_node;
}
QueryTreeNodePtr getSubqueryFromTableExpression(
const QueryTreeNodePtr & join_table_expression,
const std::unordered_map<QueryTreeNodePtr, CollectColumnSourceToColumnsVisitor::Columns> & column_source_to_columns,
const ContextPtr & context)
{
auto join_table_expression_node_type = join_table_expression->getNodeType();
QueryTreeNodePtr subquery_node;
if (join_table_expression_node_type == QueryTreeNodeType::QUERY || join_table_expression_node_type == QueryTreeNodeType::UNION)
{
subquery_node = join_table_expression;
}
else if (
join_table_expression_node_type == QueryTreeNodeType::TABLE || join_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION)
{
const auto & columns = column_source_to_columns.at(join_table_expression).columns;
subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns, join_table_expression, context);
}
else
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected JOIN right table expression to be table, table function, query or union node. Actual {}",
join_table_expression->formatASTForErrorMessage());
}
return subquery_node;
}
}
QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify)
@ -335,37 +364,32 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex
{
if (auto * join_node = global_in_or_join_node.query_node->as<JoinNode>())
{
auto join_right_table_expression = join_node->getRightTableExpression();
auto join_right_table_expression_node_type = join_right_table_expression->getNodeType();
QueryTreeNodePtr subquery_node;
if (join_right_table_expression_node_type == QueryTreeNodeType::QUERY ||
join_right_table_expression_node_type == QueryTreeNodeType::UNION)
QueryTreeNodePtr join_table_expression;
const auto join_kind = join_node->getKind();
const auto join_strictness = join_node->getStrictness();
if (join_kind == JoinKind::Left || (join_kind == JoinKind::Inner && join_strictness == JoinStrictness::All))
{
subquery_node = join_right_table_expression;
join_table_expression = join_node->getRightTableExpression();
}
else if (join_right_table_expression_node_type == QueryTreeNodeType::TABLE ||
join_right_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION)
else if (join_kind == JoinKind::Right)
{
const auto & columns = column_source_to_columns.at(join_right_table_expression).columns;
subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns,
join_right_table_expression,
planner_context->getQueryContext());
join_table_expression = join_node->getLeftTableExpression();
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected JOIN right table expression to be table, table function, query or union node. Actual {}",
join_right_table_expression->formatASTForErrorMessage());
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Unexpected join kind: {}", join_kind);
}
auto subquery_node
= getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext());
auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
temporary_table_expression_node->setAlias(join_right_table_expression->getAlias());
temporary_table_expression_node->setAlias(join_table_expression->getAlias());
replacement_map.emplace(join_right_table_expression.get(), std::move(temporary_table_expression_node));
replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node));
continue;
}
if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())