merge_row_policy: works again with adding missed columns

This commit is contained in:
Ilya Golshtein 2023-07-11 21:16:54 +00:00
parent f4c77c1f18
commit b57d8bc4a9

View File

@ -16,6 +16,7 @@
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Planner/Utils.h>
#include <Analyzer/Utils.h>
#include <Analyzer/ConstantNode.h>
@ -681,6 +682,29 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
StorageView * view = dynamic_cast<StorageView *>(storage.get());
if (!view || allow_experimental_analyzer)
{
auto row_policy_filter = modified_context->getRowPolicyFilter(database_name, table_name, RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter)
{
ASTPtr expr = row_policy_filter->expression;
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(expr);
auto req_columns = columns_context.requiredColumns();
for (const auto & req_column : req_columns)
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "req.column: {}", req_column);
std::sort(real_column_names.begin(), real_column_names.end());
if (!std::binary_search(real_column_names.begin(), real_column_names.end(), req_column))
{
real_column_names.push_back(req_column);
}
}
}
storage->read(plan,
real_column_names,
storage_snapshot,
@ -694,13 +718,12 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
if (!plan.isInitialized())
return {};
if (auto * source_step_with_filter = dynamic_cast<SourceStepWithFilter*>((plan.getRootNode()->step.get())))
{
auto row_policy_filter = modified_context->getRowPolicyFilter(database_name, table_name, RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter)
{
ASTPtr expr = row_policy_filter->expression;
if (auto * source_step_with_filter = dynamic_cast<SourceStepWithFilter*>((plan.getRootNode()->step.get())))
{
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto storage_columns = storage_metadata_snapshot->getColumns();
@ -713,12 +736,13 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
auto filter_actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
auto required_columns = filter_actions->getRequiredColumns();
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertingSourceStream"), "filter_actions_dag: {},<> {}, <> {}",
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "filter_actions_dag: {},<> {}, <> {}",
filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure());
auto filter_column_name = namesDifference(filter_actions->getSampleBlock().getNames(), filter_actions->getRequiredColumns());
source_step_with_filter->addFilter(actions_dag, filter_column_name);
}
}
}
else
@ -1026,46 +1050,6 @@ void ReadFromMerge::convertingSourceStream(
const String & database_name,
const String & table_name)
{
Block before_block_header = builder.getHeader();
auto storage_sample_block = metadata_snapshot->getSampleBlock();
auto pipe_columns = builder.getHeader().getNamesAndTypesList();
for (const auto & alias : aliases)
{
pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type));
ASTPtr expr = alias.expression;
auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns);
auto expression_analyzer = ExpressionAnalyzer{alias.expression, syntax_result, local_context};
auto dag = std::make_shared<ActionsDAG>(pipe_columns);
auto actions_dag = expression_analyzer.getActionsDAG(true, false);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name;
if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)
convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position;
auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
convert_actions_match_columns_mode);
auto actions = std::make_shared<ExpressionActions>(
std::move(convert_actions_dag),
ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
auto row_policy_filter = local_context->getRowPolicyFilter(database_name, table_name, RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter)
@ -1107,6 +1091,49 @@ void ReadFromMerge::convertingSourceStream(
return std::make_shared<FilterTransform>(stream_header, filter_actions, filter_column_name, true /* remove fake column */);
});
}
Block before_block_header = builder.getHeader();
auto storage_sample_block = metadata_snapshot->getSampleBlock();
auto pipe_columns = builder.getHeader().getNamesAndTypesList();
for (const auto & alias : aliases)
{
pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type));
ASTPtr expr = alias.expression;
auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns);
auto expression_analyzer = ExpressionAnalyzer{alias.expression, syntax_result, local_context};
auto dag = std::make_shared<ActionsDAG>(pipe_columns);
auto actions_dag = expression_analyzer.getActionsDAG(true, false);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name;
if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)
convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position;
auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
convert_actions_match_columns_mode);
auto actions = std::make_shared<ExpressionActions>(
std::move(convert_actions_dag),
ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)