Support additional_result_filter

This commit is contained in:
Dmitry Novik 2023-04-06 19:18:26 +00:00
parent 4cb1ffa2eb
commit a60c315b3e
4 changed files with 120 additions and 43 deletions

View File

@ -33,8 +33,11 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Interpreters/Context.h>
#include <Interpreters/StorageID.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDummy.h>
#include <Storages/IStorage.h>
#include <Analyzer/Utils.h>
@ -911,6 +914,46 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
addCreatingSetsStep(query_plan, std::move(subqueries_for_sets), planner_context->getQueryContext());
}
/// Support for `additional_result_filter` setting
void addAdditionalFilterStepIfNeeded(QueryPlan & query_plan,
const QueryNode & query_node,
const SelectQueryOptions & select_query_options,
PlannerContextPtr & planner_context
)
{
if (select_query_options.subquery_depth != 0)
return;
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();
auto additional_result_filter_ast = parseAdditionalResultFilter(settings);
if (!additional_result_filter_ast)
return;
ColumnsDescription fake_column_descriptions;
NameSet fake_name_set;
for (const auto & column : query_node.getProjectionColumns())
{
fake_column_descriptions.add(ColumnDescription(column.name, column.type));
fake_name_set.emplace(column.name);
}
auto storage = std::make_shared<StorageDummy>(StorageID{"dummy", "dummy"}, fake_column_descriptions);
auto fake_table_expression = std::make_shared<TableNode>(std::move(storage), query_context);
auto filter_info = buildFilterInfo(additional_result_filter_ast, fake_table_expression, planner_context, std::move(fake_name_set));
if (!filter_info.actions || !query_plan.isInitialized())
return;
auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
filter_info.actions,
filter_info.column_name,
filter_info.do_remove_column);
filter_step->setStepDescription("additional result filter");
query_plan.addStep(std::move(filter_step));
}
}
PlannerContextPtr buildPlannerContext(const QueryTreeNodePtr & query_tree_node,
@ -1409,6 +1452,9 @@ void Planner::buildPlanForQueryNode()
const auto & projection_analysis_result = expression_analysis_result.getProjection();
addExpressionStep(query_plan, projection_analysis_result.project_names_actions, "Project names", result_actions_to_execute);
}
// For additional_result_filter setting
addAdditionalFilterStepIfNeeded(query_plan, query_node, select_query_options, planner_context);
}
if (!select_query_options.only_analyze)

View File

@ -387,46 +387,6 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
prewhere_outputs.insert(prewhere_outputs.end(), required_output_nodes.begin(), required_output_nodes.end());
}
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
{
const auto & query_context = planner_context->getQueryContext();
auto filter_query_tree = buildQueryTree(filter_expression, query_context);
QueryAnalysisPass query_analysis_pass(table_expression_query_info.table_expression);
query_analysis_pass.run(filter_query_tree, query_context);
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression_query_info.table_expression);
const auto table_expression_names = table_expression_data.getColumnNames();
NameSet table_expression_required_names_without_filter(table_expression_names.begin(), table_expression_names.end());
collectSourceColumns(filter_query_tree, planner_context);
collectSets(filter_query_tree, *planner_context);
auto filter_actions_dag = std::make_shared<ActionsDAG>();
PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = actions_visitor.visit(filter_actions_dag, filter_query_tree);
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Filter actions must return single output node. Actual {}",
expression_nodes.size());
auto & filter_actions_outputs = filter_actions_dag->getOutputs();
filter_actions_outputs = std::move(expression_nodes);
std::string filter_node_name = filter_actions_outputs[0]->result_name;
bool remove_filter_column = true;
for (const auto & filter_input_node : filter_actions_dag->getInputs())
if (table_expression_required_names_without_filter.contains(filter_input_node->result_name))
filter_actions_outputs.push_back(filter_input_node);
return {std::move(filter_actions_dag), std::move(filter_node_name), remove_filter_column};
}
FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
@ -438,7 +398,7 @@ FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
if (!row_policy_filter)
return {};
return buildFilterInfo(row_policy_filter->expression, table_expression_query_info, planner_context);
return buildFilterInfo(row_policy_filter->expression, table_expression_query_info.table_expression, planner_context);
}
FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
@ -469,7 +429,7 @@ FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
*storage,
query_context);
return buildFilterInfo(parallel_replicas_custom_filter_ast, table_expression_query_info, planner_context);
return buildFilterInfo(parallel_replicas_custom_filter_ast, table_expression_query_info.table_expression, planner_context);
}
/// Apply filters from additional_table_filters setting
@ -516,7 +476,7 @@ FilterDAGInfo buildAdditionalFiltersIfNeeded(const StoragePtr & storage,
LOG_DEBUG(&Poco::Logger::get("buildAdditionalFiltersIfNeeded"), "Found additional filter: {}", additional_filter_ast->formatForErrorMessage());
return buildFilterInfo(additional_filter_ast, table_expression_query_info, planner_context);
return buildFilterInfo(additional_filter_ast, table_expression_query_info.table_expression, planner_context);
}
JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,

View File

@ -3,6 +3,8 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -28,8 +30,12 @@
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/ArrayJoinNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Planner/CollectTableExpressionData.h>
#include <Planner/CollectSets.h>
namespace DB
{
@ -416,4 +422,61 @@ SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const
return select_query_info;
}
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
const QueryTreeNodePtr & table_expression,
PlannerContextPtr & planner_context,
NameSet table_expression_required_names_without_filter)
{
const auto & query_context = planner_context->getQueryContext();
auto filter_query_tree = buildQueryTree(filter_expression, query_context);
QueryAnalysisPass query_analysis_pass(table_expression);
query_analysis_pass.run(filter_query_tree, query_context);
if (table_expression_required_names_without_filter.empty())
{
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
const auto & table_expression_names = table_expression_data.getColumnNames();
table_expression_required_names_without_filter.insert(table_expression_names.begin(), table_expression_names.end());
}
collectSourceColumns(filter_query_tree, planner_context);
collectSets(filter_query_tree, *planner_context);
auto filter_actions_dag = std::make_shared<ActionsDAG>();
PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = actions_visitor.visit(filter_actions_dag, filter_query_tree);
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Filter actions must return single output node. Actual {}",
expression_nodes.size());
auto & filter_actions_outputs = filter_actions_dag->getOutputs();
filter_actions_outputs = std::move(expression_nodes);
std::string filter_node_name = filter_actions_outputs[0]->result_name;
bool remove_filter_column = true;
for (const auto & filter_input_node : filter_actions_dag->getInputs())
if (table_expression_required_names_without_filter.contains(filter_input_node->result_name))
filter_actions_outputs.push_back(filter_input_node);
return {std::move(filter_actions_dag), std::move(filter_node_name), remove_filter_column};
}
ASTPtr parseAdditionalResultFilter(const Settings & settings)
{
const String & additional_result_filter = settings.additional_result_filter;
if (additional_result_filter.empty())
return {};
ParserExpression parser;
auto additional_result_filter_ast = parseQuery(
parser, additional_result_filter.data(), additional_result_filter.data() + additional_result_filter.size(),
"additional result filter", settings.max_query_size, settings.max_parser_depth);
return additional_result_filter_ast;
}
}

View File

@ -78,4 +78,12 @@ QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTyp
SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context);
/// Build filter for specific table_expression
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
const QueryTreeNodePtr & table_expression,
PlannerContextPtr & planner_context,
NameSet table_expression_required_names_without_filter = {});
ASTPtr parseAdditionalResultFilter(const Settings & settings);
}