From 27db36cd4fbc2b0d74e982a3992778c62ccc2482 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Tue, 16 Jul 2024 08:46:24 +0000 Subject: [PATCH] Separate converting actions in separte source file --- .../QueryPlan/ConvertingActions.cpp | 32 +++++++++++++++++++ src/Processors/QueryPlan/ConvertingActions.h | 9 ++++++ .../QueryPlan/DistributedCreateLocalPlan.cpp | 27 +--------------- .../QueryPlan/ParallelReplicasLocalPlan.cpp | 5 +-- 4 files changed, 45 insertions(+), 28 deletions(-) create mode 100644 src/Processors/QueryPlan/ConvertingActions.cpp create mode 100644 src/Processors/QueryPlan/ConvertingActions.h diff --git a/src/Processors/QueryPlan/ConvertingActions.cpp b/src/Processors/QueryPlan/ConvertingActions.cpp new file mode 100644 index 00000000000..ff106ff08c1 --- /dev/null +++ b/src/Processors/QueryPlan/ConvertingActions.cpp @@ -0,0 +1,32 @@ +#include +#include +#include + +namespace DB +{ + +void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects) +{ + if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header)) + return; + + auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name; + + auto get_converting_dag = [mode](const Block & block_, const Block & header_) + { + /// Convert header structure to expected. + /// Also we ignore constants from result and replace it with constants from header. + /// It is needed for functions like `now64()` or `randConstant()` because their values may be different. + return ActionsDAG::makeConvertingActions( + block_.getColumnsWithTypeAndName(), + header_.getColumnsWithTypeAndName(), + mode, + true); + }; + + auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header); + auto converting = std::make_unique(plan.getCurrentDataStream(), convert_actions_dag); + plan.addStep(std::move(converting)); +} + +} diff --git a/src/Processors/QueryPlan/ConvertingActions.h b/src/Processors/QueryPlan/ConvertingActions.h new file mode 100644 index 00000000000..6bdf9b8af9a --- /dev/null +++ b/src/Processors/QueryPlan/ConvertingActions.h @@ -0,0 +1,9 @@ +#pragma once + +namespace DB +{ +class QueryPlan; +class Block; + +void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects); +} diff --git a/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp b/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp index bad0380cb46..eb699858bdf 100644 --- a/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp +++ b/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp @@ -2,38 +2,13 @@ #include #include -#include #include #include -#include +#include namespace DB { -void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects) -{ - if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header)) - return; - - auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name; - - auto get_converting_dag = [mode](const Block & block_, const Block & header_) - { - /// Convert header structure to expected. - /// Also we ignore constants from result and replace it with constants from header. - /// It is needed for functions like `now64()` or `randConstant()` because their values may be different. - return ActionsDAG::makeConvertingActions( - block_.getColumnsWithTypeAndName(), - header_.getColumnsWithTypeAndName(), - mode, - true); - }; - - auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header); - auto converting = std::make_unique(plan.getCurrentDataStream(), convert_actions_dag); - plan.addStep(std::move(converting)); -} - std::unique_ptr createLocalPlan( const ASTPtr & query_ast, const Block & header, diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index 5f48a12072b..d2e862a3416 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -77,8 +78,8 @@ std::unique_ptr createLocalPlanForParallelReplicas( analyzed_result_ptr = analyzed_merge_tree->getAnalyzedResult(); } - MergeTreeAllRangesCallback all_ranges_cb - = [coordinator](InitialAllRangesAnnouncement announcement) { coordinator->handleInitialAllRangesAnnouncement(announcement); }; + MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement announcement) + { coordinator->handleInitialAllRangesAnnouncement(std::move(announcement)); }; MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional { return coordinator->handleRequest(std::move(req)); };