From f4c77c1f1827c639aefc8e12cac78080b8f380cc Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Tue, 11 Jul 2023 11:46:37 +0000 Subject: [PATCH] merge_row_policy: namesDifference, try to handle nonselected columns --- src/Storages/StorageMerge.cpp | 75 ++++++++++++------- .../02763_row_policy_storage_merge.reference | 54 ++++++++++++- .../02763_row_policy_storage_merge.sql.j2 | 23 +++++- 3 files changed, 118 insertions(+), 34 deletions(-) diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 483f592f819..ce1fdece231 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -66,6 +66,26 @@ bool columnDefaultKindHasSameType(ColumnDefaultKind lhs, ColumnDefaultKind rhs) return false; } +std::string namesDifference(Names && outer_set, Names && inner_set) +{ + std::sort(outer_set.begin(), outer_set.end()); + + std::sort(inner_set.begin(), inner_set.end()); + + Names result; + + std::set_difference(outer_set.begin(), outer_set.end(), + inner_set.begin(), inner_set.end(), std::inserter(result, result.begin())); + + if (result.size() != 1) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot determine row level filter"); + } + + return result.front(); +} + } namespace DB @@ -682,29 +702,22 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( { ASTPtr expr = row_policy_filter->expression; - auto syntax_result = TreeRewriter(modified_context).analyze(expr, header.getNamesAndTypesList()); + auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); + auto storage_columns = storage_metadata_snapshot->getColumns(); + auto needed_columns = storage_columns.getAllPhysical(); // header.getNamesAndTypesList() + + auto syntax_result = TreeRewriter(modified_context).analyze(expr, needed_columns); auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, modified_context}; - auto filter_dag_ptr = expression_analyzer.getActionsDAG(true, false); - auto filter_actions = std::make_shared(filter_dag_ptr, ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes)); + auto actions_dag = expression_analyzer.getActionsDAG(true, false); + auto filter_actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes)); auto required_columns = filter_actions->getRequiredColumns(); - LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions_dag: {},<> {}, <> {}", + LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertingSourceStream"), "filter_actions_dag: {},<> {}, <> {}", filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure()); - auto fa_actions_columns_sorted = filter_actions->getSampleBlock().getNames(); - std::sort(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end()); - - Names required_columns_sorted = required_columns; - std::sort(required_columns_sorted.begin(), required_columns_sorted.end()); - - Names filter_columns; - - std::set_difference(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end(), - required_columns.begin(), required_columns.end(), - std::inserter(filter_columns, filter_columns.begin())); - - source_step_with_filter->addFilter(filter_dag_ptr, filter_columns.front()); + auto filter_column_name = namesDifference(filter_actions->getSampleBlock().getNames(), filter_actions->getRequiredColumns()); + source_step_with_filter->addFilter(actions_dag, filter_column_name); } } } @@ -1059,33 +1072,39 @@ void ReadFromMerge::convertingSourceStream( { ASTPtr expr = row_policy_filter->expression; - auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns); + auto storage_columns = metadata_snapshot->getColumns(); + auto needed_columns = storage_columns.getAllPhysical(); // header.getNamesAndTypesList() + + + auto syntax_result = TreeRewriter(local_context).analyze(expr, needed_columns /* pipe_columns */); auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, local_context}; auto actions_dag = expression_analyzer.getActionsDAG(true, false); auto filter_actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); - auto required_columns = filter_actions->getRequiredColumns(); LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions_dag: {},<> {}, <> {}", filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure()); - auto fa_actions_columns_sorted = filter_actions->getSampleBlock().getNames(); - std::sort(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end()); - Names required_columns_sorted = required_columns; - std::sort(required_columns_sorted.begin(), required_columns_sorted.end()); + for (auto & colname : filter_actions->getSampleBlock().getNames()) + { + LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions->getSampleBlock().getNames(): {}", colname); + } - Names filter_columns; + for (auto & colname : filter_actions->getRequiredColumns()) + { + LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions->getRequiredColumns(): {}", colname); + } - std::set_difference(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end(), - required_columns.begin(), required_columns.end(), - std::inserter(filter_columns, filter_columns.begin())); + + auto filter_column_name = namesDifference(filter_actions->getSampleBlock().getNames(), filter_actions->getRequiredColumns()); + builder.addSimpleTransform([&](const Block & stream_header) { - return std::make_shared(stream_header, filter_actions, filter_columns.front(), true /* remove fake column */); + return std::make_shared(stream_header, filter_actions, filter_column_name, true /* remove fake column */); }); } } diff --git a/tests/queries/0_stateless/02763_row_policy_storage_merge.reference b/tests/queries/0_stateless/02763_row_policy_storage_merge.reference index 444513c6c20..55890a11783 100644 --- a/tests/queries/0_stateless/02763_row_policy_storage_merge.reference +++ b/tests/queries/0_stateless/02763_row_policy_storage_merge.reference @@ -1,3 +1,9 @@ +SELECT * FROM 02763_merge_log_1 ORDER BY x +1 11 +2 12 +3 13 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge) ORDER BY x 1 11 1 11 1 11 @@ -14,10 +20,6 @@ 4 14 4 14 4 14 -1 11 -2 12 -3 13 -4 14 SETTINGS optimize_move_to_prewhere= 0 SELECT * FROM 02763_merge_log_1 3 13 @@ -98,6 +100,28 @@ SELECT x, SUM(x) FROM (SELECT * FROM merge(...) UNION ALL ...) GROUP BY x 2 24 3 39 4 42 +1 11 0 +2 12 0 +3 13 0 +4 14 1 +4 14 1 +SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12 +2 12 +3 13 +3 13 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12 +2 12 +3 13 +3 13 +4 14 +4 14 +2 12 0 +3 13 1 +3 13 1 +4 14 1 +4 14 1 SETTINGS optimize_move_to_prewhere= 1 SELECT * FROM 02763_merge_log_1 3 13 @@ -178,3 +202,25 @@ SELECT x, SUM(x) FROM (SELECT * FROM merge(...) UNION ALL ...) GROUP BY x 2 24 3 39 4 42 +1 11 0 +2 12 0 +3 13 0 +4 14 1 +4 14 1 +SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12 +2 12 +3 13 +3 13 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12 +2 12 +3 13 +3 13 +4 14 +4 14 +2 12 0 +3 13 1 +3 13 1 +4 14 1 +4 14 1 diff --git a/tests/queries/0_stateless/02763_row_policy_storage_merge.sql.j2 b/tests/queries/0_stateless/02763_row_policy_storage_merge.sql.j2 index b5094f927f4..94872dfd7b6 100644 --- a/tests/queries/0_stateless/02763_row_policy_storage_merge.sql.j2 +++ b/tests/queries/0_stateless/02763_row_policy_storage_merge.sql.j2 @@ -17,10 +17,12 @@ INSERT INTO 02763_merge_log_2 VALUES (1, 11), (2, 12), (3, 13), (4, 14); INSERT INTO 02763_merge_merge_1 VALUES (1, 11), (2, 12), (3, 13), (4, 14); INSERT INTO 02763_merge_merge_2 VALUES (1, 11), (2, 12), (3, 13), (4, 14); -SELECT * FROM merge(currentDatabase(), '02763_merge') ORDER BY x; - +SELECT 'SELECT * FROM 02763_merge_log_1 ORDER BY x'; SELECT * FROM 02763_merge_log_1 ORDER BY x; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge) ORDER BY x'; +SELECT * FROM merge(currentDatabase(), '02763_merge') ORDER BY x; + {% for prew in [0 , 1] -%} @@ -81,7 +83,24 @@ SELECT * FROM merge(currentDatabase(), '02763_merge.*2')) GROUP BY x ORDER BY x; +SELECT *, x=4 FROM merge(currentDatabase(), '02763_merge_merge') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + + +CREATE ROW POLICY 02763_filter_3 ON 02763_merge_log_1 USING y>12 AS permissive TO ALL; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12'; +SELECT * FROM merge(currentDatabase(), '02763_merge_log') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +CREATE ROW POLICY 02763_filter_4 ON 02763_merge_merge_1 USING y>12 AS permissive TO ALL; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12'; +SELECT * FROM merge(currentDatabase(), '02763_merge_merge') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +SELECT *, (x=4 OR y>12) FROM merge(currentDatabase(), '02763_merge_merge') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + + DROP ROW POLICY 02763_filter_1 ON 02763_merge_log_1; DROP ROW POLICY 02763_filter_2 ON 02763_merge_merge_1; +DROP ROW POLICY 02763_filter_3 ON 02763_merge_log_1; +DROP ROW POLICY 02763_filter_4 ON 02763_merge_merge_1; + {% endfor %}