merge_row_policy: namesDifference, try to handle nonselected columns

This commit is contained in:
Ilya Golshtein 2023-07-11 11:46:37 +00:00
parent a447b3f9e0
commit f4c77c1f18
3 changed files with 118 additions and 34 deletions

View File

@ -66,6 +66,26 @@ bool columnDefaultKindHasSameType(ColumnDefaultKind lhs, ColumnDefaultKind rhs)
return false; return false;
} }
std::string namesDifference(Names && outer_set, Names && inner_set)
{
std::sort(outer_set.begin(), outer_set.end());
std::sort(inner_set.begin(), inner_set.end());
Names result;
std::set_difference(outer_set.begin(), outer_set.end(),
inner_set.begin(), inner_set.end(), std::inserter(result, result.begin()));
if (result.size() != 1)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot determine row level filter");
}
return result.front();
}
} }
namespace DB namespace DB
@ -682,29 +702,22 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
{ {
ASTPtr expr = row_policy_filter->expression; ASTPtr expr = row_policy_filter->expression;
auto syntax_result = TreeRewriter(modified_context).analyze(expr, header.getNamesAndTypesList()); auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto storage_columns = storage_metadata_snapshot->getColumns();
auto needed_columns = storage_columns.getAllPhysical(); // header.getNamesAndTypesList()
auto syntax_result = TreeRewriter(modified_context).analyze(expr, needed_columns);
auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, modified_context}; auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, modified_context};
auto filter_dag_ptr = expression_analyzer.getActionsDAG(true, false); auto actions_dag = expression_analyzer.getActionsDAG(true, false);
auto filter_actions = std::make_shared<ExpressionActions>(filter_dag_ptr, ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes)); auto filter_actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
auto required_columns = filter_actions->getRequiredColumns(); auto required_columns = filter_actions->getRequiredColumns();
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions_dag: {},<> {}, <> {}", LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertingSourceStream"), "filter_actions_dag: {},<> {}, <> {}",
filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure()); filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure());
auto fa_actions_columns_sorted = filter_actions->getSampleBlock().getNames(); auto filter_column_name = namesDifference(filter_actions->getSampleBlock().getNames(), filter_actions->getRequiredColumns());
std::sort(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end()); source_step_with_filter->addFilter(actions_dag, filter_column_name);
Names required_columns_sorted = required_columns;
std::sort(required_columns_sorted.begin(), required_columns_sorted.end());
Names filter_columns;
std::set_difference(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end(),
required_columns.begin(), required_columns.end(),
std::inserter(filter_columns, filter_columns.begin()));
source_step_with_filter->addFilter(filter_dag_ptr, filter_columns.front());
} }
} }
} }
@ -1059,33 +1072,39 @@ void ReadFromMerge::convertingSourceStream(
{ {
ASTPtr expr = row_policy_filter->expression; ASTPtr expr = row_policy_filter->expression;
auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns); auto storage_columns = metadata_snapshot->getColumns();
auto needed_columns = storage_columns.getAllPhysical(); // header.getNamesAndTypesList()
auto syntax_result = TreeRewriter(local_context).analyze(expr, needed_columns /* pipe_columns */);
auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, local_context}; auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, local_context};
auto actions_dag = expression_analyzer.getActionsDAG(true, false); auto actions_dag = expression_analyzer.getActionsDAG(true, false);
auto filter_actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); auto filter_actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
auto required_columns = filter_actions->getRequiredColumns();
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions_dag: {},<> {}, <> {}", LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions_dag: {},<> {}, <> {}",
filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure()); filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure());
auto fa_actions_columns_sorted = filter_actions->getSampleBlock().getNames();
std::sort(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end());
Names required_columns_sorted = required_columns; for (auto & colname : filter_actions->getSampleBlock().getNames())
std::sort(required_columns_sorted.begin(), required_columns_sorted.end()); {
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions->getSampleBlock().getNames(): {}", colname);
}
Names filter_columns; for (auto & colname : filter_actions->getRequiredColumns())
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions->getRequiredColumns(): {}", colname);
}
std::set_difference(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end(),
required_columns.begin(), required_columns.end(), auto filter_column_name = namesDifference(filter_actions->getSampleBlock().getNames(), filter_actions->getRequiredColumns());
std::inserter(filter_columns, filter_columns.begin()));
builder.addSimpleTransform([&](const Block & stream_header) builder.addSimpleTransform([&](const Block & stream_header)
{ {
return std::make_shared<FilterTransform>(stream_header, filter_actions, filter_columns.front(), true /* remove fake column */); return std::make_shared<FilterTransform>(stream_header, filter_actions, filter_column_name, true /* remove fake column */);
}); });
} }
} }

View File

@ -1,3 +1,9 @@
SELECT * FROM 02763_merge_log_1 ORDER BY x
1 11
2 12
3 13
4 14
SELECT * FROM merge(currentDatabase(), 02763_merge) ORDER BY x
1 11 1 11
1 11 1 11
1 11 1 11
@ -14,10 +20,6 @@
4 14 4 14
4 14 4 14
4 14 4 14
1 11
2 12
3 13
4 14
SETTINGS optimize_move_to_prewhere= 0 SETTINGS optimize_move_to_prewhere= 0
SELECT * FROM 02763_merge_log_1 SELECT * FROM 02763_merge_log_1
3 13 3 13
@ -98,6 +100,28 @@ SELECT x, SUM(x) FROM (SELECT * FROM merge(...) UNION ALL ...) GROUP BY x
2 24 2 24
3 39 3 39
4 42 4 42
1 11 0
2 12 0
3 13 0
4 14 1
4 14 1
SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12
2 12
3 13
3 13
4 14
4 14
SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12
2 12
3 13
3 13
4 14
4 14
2 12 0
3 13 1
3 13 1
4 14 1
4 14 1
SETTINGS optimize_move_to_prewhere= 1 SETTINGS optimize_move_to_prewhere= 1
SELECT * FROM 02763_merge_log_1 SELECT * FROM 02763_merge_log_1
3 13 3 13
@ -178,3 +202,25 @@ SELECT x, SUM(x) FROM (SELECT * FROM merge(...) UNION ALL ...) GROUP BY x
2 24 2 24
3 39 3 39
4 42 4 42
1 11 0
2 12 0
3 13 0
4 14 1
4 14 1
SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12
2 12
3 13
3 13
4 14
4 14
SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12
2 12
3 13
3 13
4 14
4 14
2 12 0
3 13 1
3 13 1
4 14 1
4 14 1

View File

@ -17,10 +17,12 @@ INSERT INTO 02763_merge_log_2 VALUES (1, 11), (2, 12), (3, 13), (4, 14);
INSERT INTO 02763_merge_merge_1 VALUES (1, 11), (2, 12), (3, 13), (4, 14); INSERT INTO 02763_merge_merge_1 VALUES (1, 11), (2, 12), (3, 13), (4, 14);
INSERT INTO 02763_merge_merge_2 VALUES (1, 11), (2, 12), (3, 13), (4, 14); INSERT INTO 02763_merge_merge_2 VALUES (1, 11), (2, 12), (3, 13), (4, 14);
SELECT * FROM merge(currentDatabase(), '02763_merge') ORDER BY x; SELECT 'SELECT * FROM 02763_merge_log_1 ORDER BY x';
SELECT * FROM 02763_merge_log_1 ORDER BY x; SELECT * FROM 02763_merge_log_1 ORDER BY x;
SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge) ORDER BY x';
SELECT * FROM merge(currentDatabase(), '02763_merge') ORDER BY x;
{% for prew in [0 , 1] -%} {% for prew in [0 , 1] -%}
@ -81,7 +83,24 @@ SELECT * FROM merge(currentDatabase(), '02763_merge.*2'))
GROUP BY x GROUP BY x
ORDER BY x; ORDER BY x;
SELECT *, x=4 FROM merge(currentDatabase(), '02763_merge_merge') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}};
CREATE ROW POLICY 02763_filter_3 ON 02763_merge_log_1 USING y>12 AS permissive TO ALL;
SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12';
SELECT * FROM merge(currentDatabase(), '02763_merge_log') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}};
CREATE ROW POLICY 02763_filter_4 ON 02763_merge_merge_1 USING y>12 AS permissive TO ALL;
SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12';
SELECT * FROM merge(currentDatabase(), '02763_merge_merge') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}};
SELECT *, (x=4 OR y>12) FROM merge(currentDatabase(), '02763_merge_merge') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}};
DROP ROW POLICY 02763_filter_1 ON 02763_merge_log_1; DROP ROW POLICY 02763_filter_1 ON 02763_merge_log_1;
DROP ROW POLICY 02763_filter_2 ON 02763_merge_merge_1; DROP ROW POLICY 02763_filter_2 ON 02763_merge_merge_1;
DROP ROW POLICY 02763_filter_3 ON 02763_merge_log_1;
DROP ROW POLICY 02763_filter_4 ON 02763_merge_merge_1;
{% endfor %} {% endfor %}