Separate converting actions in separte source file

This commit is contained in:
Igor Nikonov 2024-07-16 08:46:24 +00:00
parent 69c1e68359
commit 27db36cd4f
4 changed files with 45 additions and 28 deletions

View File

@ -0,0 +1,32 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/ActionsDAG.h>
#include <Processors/QueryPlan/ExpressionStep.h>
namespace DB
{
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects)
{
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
return;
auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name;
auto get_converting_dag = [mode](const Block & block_, const Block & header_)
{
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
return ActionsDAG::makeConvertingActions(
block_.getColumnsWithTypeAndName(),
header_.getColumnsWithTypeAndName(),
mode,
true);
};
auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header);
auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag);
plan.addStep(std::move(converting));
}
}

View File

@ -0,0 +1,9 @@
#pragma once
namespace DB
{
class QueryPlan;
class Block;
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects);
}

View File

@ -2,38 +2,13 @@
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h> #include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/ConvertingActions.h>
namespace DB namespace DB
{ {
void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missing_objects)
{
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
return;
auto mode = has_missing_objects ? ActionsDAG::MatchColumnsMode::Position : ActionsDAG::MatchColumnsMode::Name;
auto get_converting_dag = [mode](const Block & block_, const Block & header_)
{
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
return ActionsDAG::makeConvertingActions(
block_.getColumnsWithTypeAndName(),
header_.getColumnsWithTypeAndName(),
mode,
true);
};
auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header);
auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag);
plan.addStep(std::move(converting));
}
std::unique_ptr<QueryPlan> createLocalPlan( std::unique_ptr<QueryPlan> createLocalPlan(
const ASTPtr & query_ast, const ASTPtr & query_ast,
const Block & header, const Block & header,

View File

@ -6,6 +6,7 @@
#include <Interpreters/InterpreterSelectQueryAnalyzer.h> #include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/StorageID.h> #include <Interpreters/StorageID.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Processors/QueryPlan/ConvertingActions.h>
#include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ISourceStep.h> #include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h> #include <Processors/QueryPlan/ReadFromMergeTree.h>
@ -77,8 +78,8 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
analyzed_result_ptr = analyzed_merge_tree->getAnalyzedResult(); analyzed_result_ptr = analyzed_merge_tree->getAnalyzedResult();
} }
MergeTreeAllRangesCallback all_ranges_cb MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement announcement)
= [coordinator](InitialAllRangesAnnouncement announcement) { coordinator->handleInitialAllRangesAnnouncement(announcement); }; { coordinator->handleInitialAllRangesAnnouncement(std::move(announcement)); };
MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse> MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse>
{ return coordinator->handleRequest(std::move(req)); }; { return coordinator->handleRequest(std::move(req)); };