Cleanup read() in MT/RMT

This commit is contained in:
Igor Nikonov 2024-05-17 14:55:15 +00:00
parent f990bb2a21
commit d8ddabeb35
2 changed files with 2 additions and 56 deletions

View File

@ -217,34 +217,8 @@ void StorageMergeTree::read(
if (local_context->canUseParallelReplicasOnInitiator() && settings.parallel_replicas_for_non_replicated_merge_tree
&& !settings.allow_experimental_analyzer)
{
ASTPtr modified_query_ast;
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
QueryTreeNodePtr modified_query_tree = query_info.query_tree->clone();
rewriteJoinToGlobalJoin(modified_query_tree, local_context);
modified_query_tree = buildQueryTreeForShard(query_info.planner_context, modified_query_tree);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
}
else
{
const auto table_id = getStorageID();
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
ClusterProxy::executeQueryWithParallelReplicas(
query_plan,
getStorageID(),
header,
processed_stage,
modified_query_ast,
local_context,
query_info.storage_limits);
query_plan, getStorageID(), processed_stage, query_info.query, local_context, query_info.storage_limits);
}
else
{

View File

@ -5479,36 +5479,8 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage)
{
ASTPtr modified_query_ast;
Block header;
const auto table_id = getStorageID();
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
QueryTreeNodePtr modified_query_tree = query_info.query_tree->clone();
rewriteJoinToGlobalJoin(modified_query_tree, local_context);
modified_query_tree = buildQueryTreeForShard(query_info.planner_context, modified_query_tree);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
}
else
{
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
ClusterProxy::executeQueryWithParallelReplicas(
query_plan,
table_id,
header,
processed_stage,
modified_query_ast,
local_context,
query_info.storage_limits);
query_plan, getStorageID(), processed_stage, query_info.query, local_context, query_info.storage_limits);
}
void StorageReplicatedMergeTree::readLocalImpl(