This commit is contained in:
Igor Nikonov 2024-06-25 14:31:11 +00:00
parent 96b68cb920
commit 318af3af95
6 changed files with 14 additions and 46 deletions

View File

@ -20,34 +20,7 @@
namespace DB
{
namespace
{
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects)
{
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
return;
auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name;
auto get_converting_dag = [mode](const Block & block_, const Block & header_)
{
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
return ActionsDAG::makeConvertingActions(
block_.getColumnsWithTypeAndName(),
header_.getColumnsWithTypeAndName(),
mode,
true);
};
auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header);
auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag);
plan.addStep(std::move(converting));
}
}
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects);
std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
const ASTPtr & query_ast,
@ -55,8 +28,7 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
ParallelReplicasReadingCoordinatorPtr coordinator,
QueryPlanStepPtr analyzed_read_from_merge_tree,
bool has_missing_objects)
QueryPlanStepPtr analyzed_read_from_merge_tree)
{
checkStackSize();
@ -112,7 +84,7 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
= reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb, number_of_local_replica);
node->step = std::move(read_from_merge_tree_parallel_replicas);
addConvertingActions(*query_plan, header, has_missing_objects);
addConvertingActions(*query_plan, header, /*has_missing_objects=*/false);
return query_plan;
}

View File

@ -14,6 +14,5 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
ParallelReplicasReadingCoordinatorPtr coordinator,
QueryPlanStepPtr read_from_merge_tree,
bool has_missing_objects);
QueryPlanStepPtr read_from_merge_tree);
}

View File

@ -339,9 +339,9 @@ ReadFromMergeTree::ReadFromMergeTree(
std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_,
std::optional<size_t> number_of_current_replica_)
std::optional<MergeTreeReadTaskCallback> read_task_callback_)
{
const auto number_of_local_replica = 0;
return std::make_unique<ReadFromMergeTree>(
prepared_parts,
alter_conversions_for_parts,
@ -356,9 +356,9 @@ std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplica
log,
(analyzed_merge_tree ? analyzed_merge_tree->analyzed_result_ptr : nullptr),
true,
all_ranges_callback_,
read_task_callback_,
number_of_current_replica_);
all_ranges_callback,
read_task_callback,
number_of_local_replica);
}
Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings)

View File

@ -123,14 +123,13 @@ public:
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_ = std::nullopt,
std::optional<MergeTreeReadTaskCallback> read_task_callback_ = std::nullopt,
std::optional<MergeTreeReadTaskCallback> read_task_callback_ = std::nullopt);
std::optional<size_t> number_of_current_replica_ = std::nullopt);
std::unique_ptr<ReadFromMergeTree> createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_,
std::optional<size_t> number_of_current_replica_);
std::optional<MergeTreeAllRangesCallback> all_ranges_callback,
std::optional<MergeTreeReadTaskCallback> read_task_callback);
static constexpr auto name = "ReadFromMergeTree";
String getName() const override { return name; }
@ -291,8 +290,6 @@ private:
bool enable_vertical_final = false;
bool enable_remove_parts_from_snapshot_optimization = true;
std::optional<size_t> number_of_current_replica;
friend class ReadFromMergeTreeCoordinated;
};
}

View File

@ -30,8 +30,8 @@ public:
/// needed to report total rows to read
void setProgressCallback(ProgressCallback callback);
void initialize(CoordinationMode mode);
private:
void initialize(CoordinationMode mode);
std::mutex mutex;
const size_t replicas_count{0};

View File

@ -6,7 +6,7 @@ SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=1
SET send_logs_level='error';
-- with local plan for initiator, the query can be executed fast on initator, we can simply not come to the point where unavailable replica can be detected
-- therefore disable local plan for now
SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*) SETTINGS log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79', allow_experimental_analyzer=0;
SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*) SETTINGS log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79', parallel_replicas_local_plan=0;
SYSTEM FLUSH LOGS;