Local plan for parallel replicas: save

This commit is contained in:
Igor Nikonov 2024-05-22 21:28:33 +00:00
parent b47eb39d56
commit fa8aafa942
9 changed files with 182 additions and 29 deletions

View File

@ -25,6 +25,7 @@
#include <Storages/buildQueryTreeForShard.h>
#include <Planner/Utils.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/QueryPlan/ParallelReplicasLocalPlan.h>
namespace DB
{
@ -403,7 +404,8 @@ void executeQueryWithParallelReplicas(
QueryProcessingStage::Enum processed_stage,
const ASTPtr & query_ast,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits)
std::shared_ptr<const StorageLimitsList> storage_limits,
QueryPlanStepPtr read_from_merge_tree)
{
const auto & settings = context->getSettingsRef();
@ -486,21 +488,66 @@ void executeQueryWithParallelReplicas(
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(
new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
new_cluster,
storage_id,
std::move(coordinator),
header,
processed_stage,
new_context,
getThrottler(new_context),
std::move(scalars),
std::move(external_tables),
getLogger("ReadFromParallelRemoteReplicasStep"),
std::move(storage_limits));
query_plan.addStep(std::move(read_from_remote));
if (settings.allow_experimental_analyzer)
{
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
new_cluster,
storage_id,
std::move(coordinator),
header,
processed_stage,
new_context,
getThrottler(new_context),
std::move(scalars),
std::move(external_tables),
getLogger("ReadFromParallelRemoteReplicasStep"),
std::move(storage_limits),
/*exclude_local_replica*/ true);
auto remote_plan = std::make_unique<QueryPlan>();
remote_plan->addStep(std::move(read_from_remote));
auto local_plan = createLocalPlanForParallelReplicas(
query_ast,
header,
new_context,
processed_stage,
coordinator,
std::move(read_from_merge_tree),
/*has_missing_objects=*/false);
DataStreams input_streams;
input_streams.reserve(2);
input_streams.emplace_back(local_plan->getCurrentDataStream());
input_streams.emplace_back(remote_plan->getCurrentDataStream());
std::vector<QueryPlanPtr> plans;
plans.emplace_back(std::move(local_plan));
plans.emplace_back(std::move(remote_plan));
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
else {
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
new_cluster,
storage_id,
std::move(coordinator),
header,
processed_stage,
new_context,
getThrottler(new_context),
std::move(scalars),
std::move(external_tables),
getLogger("ReadFromParallelRemoteReplicasStep"),
std::move(storage_limits),
/*exclude_local_replica*/ false);
query_plan.addStep(std::move(read_from_remote));
}
}
void executeQueryWithParallelReplicas(
@ -510,7 +557,8 @@ void executeQueryWithParallelReplicas(
const QueryTreeNodePtr & query_tree,
const PlannerContextPtr & planner_context,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits)
std::shared_ptr<const StorageLimitsList> storage_limits,
QueryPlanStepPtr read_from_merge_tree)
{
QueryTreeNodePtr modified_query_tree = query_tree->clone();
rewriteJoinToGlobalJoin(modified_query_tree, context);
@ -520,7 +568,8 @@ void executeQueryWithParallelReplicas(
= InterpreterSelectQueryAnalyzer::getSampleBlock(modified_query_tree, context, SelectQueryOptions(processed_stage).analyze());
auto modified_query_ast = queryNodeToDistributedSelectQuery(modified_query_tree);
executeQueryWithParallelReplicas(query_plan, storage_id, header, processed_stage, modified_query_ast, context, storage_limits);
executeQueryWithParallelReplicas(
query_plan, storage_id, header, processed_stage, modified_query_ast, context, storage_limits, std::move(read_from_merge_tree));
}
void executeQueryWithParallelReplicas(

View File

@ -30,6 +30,9 @@ using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
class PlannerContext;
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
class IQueryPlanStep;
using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;
namespace ClusterProxy
{
@ -73,7 +76,8 @@ void executeQueryWithParallelReplicas(
QueryProcessingStage::Enum processed_stage,
const ASTPtr & query_ast,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits);
std::shared_ptr<const StorageLimitsList> storage_limits,
QueryPlanStepPtr read_from_merge_tree = nullptr);
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
@ -90,7 +94,8 @@ void executeQueryWithParallelReplicas(
const QueryTreeNodePtr & query_tree,
const PlannerContextPtr & planner_context,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits);
std::shared_ptr<const StorageLimitsList> storage_limits,
QueryPlanStepPtr read_from_merge_tree);
}
}

View File

@ -934,6 +934,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
{
from_stage = QueryProcessingStage::WithMergeableState;
QueryPlan query_plan_parallel_replicas;
QueryPlanStepPtr reading_step = std::move(node->step);
ClusterProxy::executeQueryWithParallelReplicas(
query_plan_parallel_replicas,
storage->getStorageID(),
@ -941,7 +942,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
table_expression_query_info.query_tree,
table_expression_query_info.planner_context,
query_context,
table_expression_query_info.storage_limits);
table_expression_query_info.storage_limits,
std::move(reading_step));
query_plan = std::move(query_plan_parallel_replicas);
const Block & query_plan_header = query_plan.getCurrentDataStream().header;

View File

@ -1,8 +1,6 @@
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
#include <Common/config_version.h>
#include <Common/checkStackSize.h>
#include <Core/ProtocolDefines.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>

View File

@ -1,17 +1,12 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Core/UUID.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/ResizeProcessor.h>
namespace DB
{
class PreparedSets;
using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
std::unique_ptr<QueryPlan> createLocalPlan(
const ASTPtr & query_ast,
const Block & header,

View File

@ -0,0 +1,78 @@
#include <Processors/QueryPlan/ParallelReplicasLocalPlan.h>
#include <Common/checkStackSize.h>
#include <Interpreters/Context.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Interpreters/StorageID.h>
namespace DB
{
namespace
{
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects)
{
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
return;
auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name;
auto get_converting_dag = [mode](const Block & block_, const Block & header_)
{
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
return ActionsDAG::makeConvertingActions(
block_.getColumnsWithTypeAndName(),
header_.getColumnsWithTypeAndName(),
mode,
true);
};
auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header);
auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag);
plan.addStep(std::move(converting));
}
}
std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
const ASTPtr & query_ast,
const Block & header,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
ParallelReplicasReadingCoordinatorPtr /*coordinator*/,
QueryPlanStepPtr /*read_from_merge_tree*/,
bool has_missing_objects)
{
checkStackSize();
auto query_plan = std::make_unique<QueryPlan>();
auto new_context = Context::createCopy(context);
/// Do not push down limit to local plan, as it will break `rows_before_limit_at_least` counter.
if (processed_stage == QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit)
processed_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
/// Do not apply AST optimizations, because query
/// is already optimized and some optimizations
/// can be applied only for non-distributed tables
/// and we can produce query, inconsistent with remote plans.
auto select_query_options = SelectQueryOptions(processed_stage).ignoreASTOptimizations();
/// For Analyzer, identifier in GROUP BY/ORDER BY/LIMIT BY lists has been resolved to
/// ConstantNode in QueryTree if it is an alias of a constant, so we should not replace
/// ConstantNode with ProjectionNode again(https://github.com/ClickHouse/ClickHouse/issues/62289).
new_context->setSetting("enable_positional_arguments", Field(false));
auto interpreter = InterpreterSelectQueryAnalyzer(query_ast, new_context, select_query_options);
query_plan = std::make_unique<QueryPlan>(std::move(interpreter).extractQueryPlan());
addConvertingActions(*query_plan, header, has_missing_objects);
return query_plan;
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
namespace DB
{
std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
const ASTPtr & query_ast,
const Block & header,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
ParallelReplicasReadingCoordinatorPtr coordinator,
QueryPlanStepPtr read_from_merge_tree,
bool has_missing_objects);
}

View File

@ -369,7 +369,8 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
Scalars scalars_,
Tables external_tables_,
LoggerPtr log_,
std::shared_ptr<const StorageLimitsList> storage_limits_)
std::shared_ptr<const StorageLimitsList> storage_limits_,
bool exclude_local_replica_)
: ISourceStep(DataStream{.header = std::move(header_)})
, cluster(cluster_)
, query_ast(query_ast_)
@ -382,6 +383,7 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, external_tables{external_tables_}
, storage_limits(std::move(storage_limits_))
, log(log_)
, exclude_local_replica(exclude_local_replica_)
{
chassert(cluster->getShardCount() == 1);
@ -410,6 +412,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
const auto & shard = cluster->getShardsInfo().at(0);
size_t all_replicas_count = current_settings.max_parallel_replicas;
if (exclude_local_replica)
--all_replicas_count;
if (all_replicas_count > shard.getAllNodeCount())
{
LOG_INFO(

View File

@ -78,7 +78,8 @@ public:
Scalars scalars_,
Tables external_tables_,
LoggerPtr log_,
std::shared_ptr<const StorageLimitsList> storage_limits_);
std::shared_ptr<const StorageLimitsList> storage_limits_,
bool exclude_local_replica = false);
String getName() const override { return "ReadFromRemoteParallelReplicas"; }
@ -101,6 +102,7 @@ private:
Tables external_tables;
std::shared_ptr<const StorageLimitsList> storage_limits;
LoggerPtr log;
bool exclude_local_replica;
};
}