From b57d8bc4a91eddcc41469cc21f983237797fa272 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Tue, 11 Jul 2023 21:16:54 +0000 Subject: [PATCH] merge_row_policy: works again with adding missed columns --- src/Storages/StorageMerge.cpp | 119 +++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 46 deletions(-) diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index ce1fdece231..920604c876a 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -681,6 +682,29 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( StorageView * view = dynamic_cast(storage.get()); if (!view || allow_experimental_analyzer) { + + auto row_policy_filter = modified_context->getRowPolicyFilter(database_name, table_name, RowPolicyFilterType::SELECT_FILTER); + if (row_policy_filter) + { + ASTPtr expr = row_policy_filter->expression; + + RequiredSourceColumnsVisitor::Data columns_context; + RequiredSourceColumnsVisitor(columns_context).visit(expr); + + auto req_columns = columns_context.requiredColumns(); + for (const auto & req_column : req_columns) + { + LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "req.column: {}", req_column); + std::sort(real_column_names.begin(), real_column_names.end()); + + if (!std::binary_search(real_column_names.begin(), real_column_names.end(), req_column)) + { + real_column_names.push_back(req_column); + } + } + } + + storage->read(plan, real_column_names, storage_snapshot, @@ -694,13 +718,12 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( if (!plan.isInitialized()) return {}; - if (auto * source_step_with_filter = dynamic_cast((plan.getRootNode()->step.get()))) - { - auto row_policy_filter = modified_context->getRowPolicyFilter(database_name, table_name, RowPolicyFilterType::SELECT_FILTER); - if (row_policy_filter) + if (row_policy_filter) + { + ASTPtr expr = row_policy_filter->expression; + if (auto * source_step_with_filter = dynamic_cast((plan.getRootNode()->step.get()))) { - ASTPtr expr = row_policy_filter->expression; auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); auto storage_columns = storage_metadata_snapshot->getColumns(); @@ -713,12 +736,13 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( auto filter_actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes)); auto required_columns = filter_actions->getRequiredColumns(); - LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertingSourceStream"), "filter_actions_dag: {},<> {}, <> {}", + LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "filter_actions_dag: {},<> {}, <> {}", filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure()); auto filter_column_name = namesDifference(filter_actions->getSampleBlock().getNames(), filter_actions->getRequiredColumns()); source_step_with_filter->addFilter(actions_dag, filter_column_name); } + } } else @@ -1026,46 +1050,6 @@ void ReadFromMerge::convertingSourceStream( const String & database_name, const String & table_name) { - Block before_block_header = builder.getHeader(); - - auto storage_sample_block = metadata_snapshot->getSampleBlock(); - auto pipe_columns = builder.getHeader().getNamesAndTypesList(); - - for (const auto & alias : aliases) - { - pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type)); - ASTPtr expr = alias.expression; - auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns); - auto expression_analyzer = ExpressionAnalyzer{alias.expression, syntax_result, local_context}; - - auto dag = std::make_shared(pipe_columns); - auto actions_dag = expression_analyzer.getActionsDAG(true, false); - auto actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); - - builder.addSimpleTransform([&](const Block & stream_header) - { - return std::make_shared(stream_header, actions); - }); - } - - ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name; - - if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns) - convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position; - - auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(), - header.getColumnsWithTypeAndName(), - convert_actions_match_columns_mode); - auto actions = std::make_shared( - std::move(convert_actions_dag), - ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); - - builder.addSimpleTransform([&](const Block & stream_header) - { - return std::make_shared(stream_header, actions); - }); - - auto row_policy_filter = local_context->getRowPolicyFilter(database_name, table_name, RowPolicyFilterType::SELECT_FILTER); if (row_policy_filter) @@ -1107,6 +1091,49 @@ void ReadFromMerge::convertingSourceStream( return std::make_shared(stream_header, filter_actions, filter_column_name, true /* remove fake column */); }); } + + + + Block before_block_header = builder.getHeader(); + + auto storage_sample_block = metadata_snapshot->getSampleBlock(); + auto pipe_columns = builder.getHeader().getNamesAndTypesList(); + + for (const auto & alias : aliases) + { + pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type)); + ASTPtr expr = alias.expression; + auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns); + auto expression_analyzer = ExpressionAnalyzer{alias.expression, syntax_result, local_context}; + + auto dag = std::make_shared(pipe_columns); + auto actions_dag = expression_analyzer.getActionsDAG(true, false); + auto actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); + + builder.addSimpleTransform([&](const Block & stream_header) + { + return std::make_shared(stream_header, actions); + }); + } + + ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name; + + if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns) + convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position; + + auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(), + header.getColumnsWithTypeAndName(), + convert_actions_match_columns_mode); + auto actions = std::make_shared( + std::move(convert_actions_dag), + ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); + + builder.addSimpleTransform([&](const Block & stream_header) + { + return std::make_shared(stream_header, actions); + }); + + } bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)