diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 1d009ec3f3b..205ec049975 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -608,11 +608,15 @@ InterpreterSelectQuery::InterpreterSelectQuery( if (storage) { + query_info.filter_asts.clear(); + /// Fix source_header for filter actions. if (row_policy_filter) { filter_info = generateFilterActions( table_id, row_policy_filter, context, storage, storage_snapshot, metadata_snapshot, required_columns); + + query_info.filter_asts.push_back(row_policy_filter); } if (query_info.additional_filter_ast) @@ -621,6 +625,8 @@ InterpreterSelectQuery::InterpreterSelectQuery( table_id, query_info.additional_filter_ast, context, storage, storage_snapshot, metadata_snapshot, required_columns); additional_filter_info->do_remove_column = true; + + query_info.filter_asts.push_back(query_info.additional_filter_ast); } source_header = storage_snapshot->getSampleBlockForColumns(required_columns); @@ -2002,8 +2008,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc && storage && storage->getName() != "MaterializedMySQL" && !storage->hasLightweightDeletedMask() - && !row_policy_filter - && !query_info.additional_filter_ast + && query_info.filter_asts.empty() && processing_stage == QueryProcessingStage::FetchColumns && query_analyzer->hasAggregation() && (query_analyzer->aggregates().size() == 1) @@ -2103,7 +2108,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc && !query.limit_with_ties && !query.prewhere() && !query.where() - && !query_info.additional_filter_ast + && query_info.filter_asts.empty() && !query.groupBy() && !query.having() && !query.orderBy() diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index f2cdcbba9ed..a94c9cb5462 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -127,6 +127,8 @@ public: /// It will set shard_num and shard_count to the client_info void setProperClientInfo(size_t replica_num, size_t replica_count); + FilterDAGInfoPtr getAdditionalQueryInfo() const { return additional_filter_info; } + static SortDescription getSortDescription(const ASTSelectQuery & query, const ContextPtr & context); static UInt64 getLimitForSorting(const ASTSelectQuery & query, const ContextPtr & context); diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyCondition.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyCondition.cpp index e559c23bbaf..7d682c408e5 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyCondition.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyCondition.cpp @@ -1,9 +1,10 @@ #include +#include #include #include #include #include -#include +#include namespace DB::QueryPlanOptimizations { @@ -16,33 +17,41 @@ void optimizePrimaryKeyCondition(QueryPlan::Node & root) size_t next_child = 0; }; - std::stack stack; - stack.push({.node = &root}); + std::deque stack; + stack.push_back({.node = &root}); while (!stack.empty()) { - auto & frame = stack.top(); + auto & frame = stack.back(); /// Traverse all children first. if (frame.next_child < frame.node->children.size()) { - stack.push({.node = frame.node->children[frame.next_child]}); + stack.push_back({.node = frame.node->children[frame.next_child]}); ++frame.next_child; continue; } - if (auto * filter_step = typeid_cast(frame.node->step.get())) + auto add_filter = [&](auto & storage) { - auto * child = frame.node->children.at(0); - if (auto * read_from_merge_tree = typeid_cast(child->step.get())) - read_from_merge_tree->addFilter(filter_step->getExpression(), filter_step->getFilterColumnName()); + for (auto iter=stack.rbegin() + 1; iter!=stack.rend(); ++iter) + { + if (auto * filter_step = typeid_cast(iter->node->step.get())) + storage.addFilter(filter_step->getExpression(), filter_step->getFilterColumnName()); + else if (typeid_cast(iter->node->step.get())) + ; + else + break; + } + }; - if (auto * read_from_merge = typeid_cast(child->step.get())) - read_from_merge->addFilter(filter_step->getExpression(), filter_step->getFilterColumnName()); - } + if (auto * read_from_merge_tree = typeid_cast(frame.node->step.get())) + add_filter(*read_from_merge_tree); + else if (auto * read_from_merge = typeid_cast(frame.node->step.get())) + add_filter(*read_from_merge); - stack.pop(); + stack.pop_back(); } } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 0d6f591b43a..14b06f9704b 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -835,8 +835,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge return selectRangesToRead( std::move(parts), prewhere_info, - added_filter, - added_filter_column_name, + added_filter_nodes, storage_snapshot->metadata, storage_snapshot->getMetadataForQuery(), query_info, @@ -852,8 +851,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead( MergeTreeData::DataPartsVector parts, const PrewhereInfoPtr & prewhere_info, - const ActionsDAGPtr & added_filter, - const std::string & added_filter_column_name, + const ActionDAGNodes & added_filter_nodes, const StorageMetadataPtr & metadata_snapshot_base, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, @@ -895,17 +893,23 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead( ActionDAGNodes nodes; if (prewhere_info) { - const auto & node = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name); - nodes.nodes.push_back(&node); + { + const auto & node = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name); + nodes.nodes.push_back(&node); + } + + if (prewhere_info->row_level_filter) + { + const auto & node = prewhere_info->row_level_filter->findInOutputs(prewhere_info->row_level_column_name); + nodes.nodes.push_back(&node); + } } - if (added_filter) - { - const auto & node = added_filter->findInOutputs(added_filter_column_name); - nodes.nodes.push_back(&node); - } + for (const auto & node : added_filter_nodes.nodes) + nodes.nodes.push_back(node); - key_condition.emplace(std::move(nodes), query_info.syntax_analyzer_result, query_info.prepared_sets, context, primary_key_columns, primary_key.expression); + key_condition.emplace( + std::move(nodes), query_info.syntax_analyzer_result, query_info.prepared_sets, context, primary_key_columns, primary_key.expression); } else { diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 1ba68b3fdb3..318f5a4b91f 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -116,8 +116,14 @@ public: void addFilter(ActionsDAGPtr expression, std::string column_name) { - added_filter = std::move(expression); - added_filter_column_name = std::move(column_name); + added_filter_dags.push_back(expression); + added_filter_nodes.nodes.push_back(&expression->findInOutputs(column_name)); + } + + void addFilterNodes(const ActionDAGNodes & filter_nodes) + { + for (const auto & node : filter_nodes.nodes) + added_filter_nodes.nodes.push_back(node); } StorageID getStorageID() const { return data.getStorageID(); } @@ -128,8 +134,7 @@ public: static MergeTreeDataSelectAnalysisResultPtr selectRangesToRead( MergeTreeData::DataPartsVector parts, const PrewhereInfoPtr & prewhere_info, - const ActionsDAGPtr & added_filter, - const std::string & added_filter_column_name, + const ActionDAGNodes & added_filter_nodes, const StorageMetadataPtr & metadata_snapshot_base, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, @@ -160,8 +165,8 @@ private: PrewhereInfoPtr prewhere_info; ExpressionActionsSettings actions_settings; - ActionsDAGPtr added_filter; - std::string added_filter_column_name; + std::vector added_filter_dags; + ActionDAGNodes added_filter_nodes; StorageSnapshotPtr storage_snapshot; StorageMetadataPtr metadata_for_reading; diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index 7128558b734..b42fe49a1d0 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -854,6 +854,7 @@ static NameSet getAllSubexpressionNames(const ExpressionActions & key_expr) KeyCondition::KeyCondition( const ASTPtr & query, + const ASTs & additional_filter_asts, TreeRewriterResultPtr syntax_analyzer_result, PreparedSetsPtr prepared_sets_, ContextPtr context, @@ -883,13 +884,35 @@ KeyCondition::KeyCondition( array_joined_columns.insert(name); const ASTSelectQuery & select = query->as(); - if (select.where() || select.prewhere()) + + ASTs filters; + if (select.where()) + filters.push_back(select.where()); + + if (select.prewhere()) + filters.push_back(select.prewhere()); + + for (const auto & filter_ast : additional_filter_asts) + filters.push_back(filter_ast); + + if (!filters.empty()) { ASTPtr filter_query; - if (select.where() && select.prewhere()) - filter_query = makeASTFunction("and", select.where(), select.prewhere()); + if (filters.size() == 1) + { + filter_query = filters.front(); + } else - filter_query = select.where() ? select.where() : select.prewhere(); + { + auto function = std::make_shared(); + + function->name = "and"; + function->arguments = std::make_shared(); + function->children.push_back(function->arguments); + function->arguments->children = std::move(filters); + + filter_query = function; + } /** When non-strictly monotonic functions are employed in functional index (e.g. ORDER BY toStartOfHour(dateTime)), * the use of NOT operator in predicate will result in the indexing algorithm leave out some data. diff --git a/src/Storages/MergeTree/KeyCondition.h b/src/Storages/MergeTree/KeyCondition.h index 3c2089a56d7..586bc43f791 100644 --- a/src/Storages/MergeTree/KeyCondition.h +++ b/src/Storages/MergeTree/KeyCondition.h @@ -208,6 +208,7 @@ public: /// Does not take into account the SAMPLE section. all_columns - the set of all columns of the table. KeyCondition( const ASTPtr & query, + const ASTs & additional_filter_asts, TreeRewriterResultPtr syntax_analyzer_result, PreparedSetsPtr prepared_sets_, ContextPtr context, @@ -223,9 +224,18 @@ public: const ExpressionActionsPtr & key_expr_, bool single_point_ = false, bool strict_ = false) - : KeyCondition(query_info.query, query_info.syntax_analyzer_result, query_info.prepared_sets, - context, key_column_names, key_expr_, single_point_, strict_) - {} + : KeyCondition( + query_info.query, + query_info.filter_asts, + query_info.syntax_analyzer_result, + query_info.prepared_sets, + context, + key_column_names, + key_expr_, + single_point_, + strict_) + { + } KeyCondition( ActionDAGNodes dag_nodes, diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 594b4a32f9c..63705fbcdf5 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5142,8 +5142,7 @@ static void selectBestProjection( const MergeTreeDataSelectExecutor & reader, const StorageSnapshotPtr & storage_snapshot, const SelectQueryInfo & query_info, - const ActionsDAGPtr & added_filter, - const std::string & added_filter_column_name, + const ActionDAGNodes & added_filter_nodes, const Names & required_columns, ProjectionCandidate & candidate, ContextPtr query_context, @@ -5174,8 +5173,7 @@ static void selectBestProjection( storage_snapshot->metadata, candidate.desc->metadata, query_info, - added_filter, - added_filter_column_name, + added_filter_nodes, query_context, settings.max_threads, max_added_blocks); @@ -5198,8 +5196,7 @@ static void selectBestProjection( storage_snapshot->metadata, storage_snapshot->metadata, query_info, // TODO syntax_analysis_result set in index - added_filter, - added_filter_column_name, + added_filter_nodes, query_context, settings.max_threads, max_added_blocks); @@ -5524,6 +5521,14 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg const auto & before_where = analysis_result.before_where; const auto & where_column_name = analysis_result.where_column_name; + /// For PK analysis + ActionDAGNodes added_filter_nodes; + if (auto additional_filter_info = select.getAdditionalQueryInfo()) + added_filter_nodes.nodes.push_back(&additional_filter_info->actions->findInOutputs(additional_filter_info->column_name)); + + if (before_where) + added_filter_nodes.nodes.push_back(&before_where->findInOutputs(where_column_name)); + bool can_use_aggregate_projection = true; /// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage, /// we cannot use aggregate projection. @@ -5750,7 +5755,7 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg query_info.minmax_count_projection_block = getMinMaxCountProjectionBlock( metadata_snapshot, minmax_count_projection_candidate->required_columns, - analysis_result.prewhere_info || analysis_result.before_where, + !query_info.filter_asts.empty() || analysis_result.prewhere_info || analysis_result.before_where, query_info, parts, normal_parts, @@ -5792,8 +5797,7 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg metadata_snapshot, metadata_snapshot, query_info, - before_where, - where_column_name, + added_filter_nodes, query_context, settings.max_threads, max_added_blocks); @@ -5825,8 +5829,7 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg metadata_snapshot, metadata_snapshot, query_info, - before_where, - where_column_name, + added_filter_nodes, query_context, settings.max_threads, max_added_blocks); @@ -5852,8 +5855,7 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg reader, storage_snapshot, query_info, - before_where, - where_column_name, + added_filter_nodes, analysis_result.required_columns, candidate, query_context, @@ -5874,8 +5876,7 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg reader, storage_snapshot, query_info, - before_where, - where_column_name, + added_filter_nodes, analysis_result.required_columns, candidate, query_context, diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index ba3505b5886..c5f546a9c36 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1273,8 +1273,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar const StorageMetadataPtr & metadata_snapshot_base, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, - const ActionsDAGPtr & added_filter, - const std::string & added_filter_column_name, + const ActionDAGNodes & added_filter_nodes, ContextPtr context, unsigned num_streams, std::shared_ptr max_block_numbers_to_read) const @@ -1295,8 +1294,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar return ReadFromMergeTree::selectRangesToRead( std::move(parts), query_info.prewhere_info, - added_filter, - added_filter_column_name, + added_filter_nodes, metadata_snapshot_base, metadata_snapshot, query_info, diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 899cf1f2862..bb44f260eec 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -60,8 +60,7 @@ public: const StorageMetadataPtr & metadata_snapshot_base, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, - const ActionsDAGPtr & added_filter, - const std::string & added_filter_column_name, + const ActionDAGNodes & added_filter_nodes, ContextPtr context, unsigned num_streams, std::shared_ptr max_block_numbers_to_read = nullptr) const; diff --git a/src/Storages/MergeTree/MergeTreeIndexMinMax.cpp b/src/Storages/MergeTree/MergeTreeIndexMinMax.cpp index 05319ecc62e..b190ac2b2fd 100644 --- a/src/Storages/MergeTree/MergeTreeIndexMinMax.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexMinMax.cpp @@ -157,9 +157,7 @@ void MergeTreeIndexAggregatorMinMax::update(const Block & block, size_t * pos, s MergeTreeIndexConditionMinMax::MergeTreeIndexConditionMinMax( - const IndexDescription & index, - const SelectQueryInfo & query, - ContextPtr context) + const IndexDescription & index, const SelectQueryInfo & query, ContextPtr context) : index_data_types(index.data_types) , condition(query, context, index.column_names, index.expression) { diff --git a/src/Storages/MergeTree/PartitionPruner.h b/src/Storages/MergeTree/PartitionPruner.h index 675fef1433d..9953c52b593 100644 --- a/src/Storages/MergeTree/PartitionPruner.h +++ b/src/Storages/MergeTree/PartitionPruner.h @@ -26,9 +26,7 @@ private: public: PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict) : partition_key(MergeTreePartition::adjustPartitionKey(metadata, context)) - , partition_condition( - query_info.query, query_info.syntax_analyzer_result, query_info.prepared_sets, - context, partition_key.column_names, partition_key.expression, true /* single_point */, strict) + , partition_condition(query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict) , useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue()) { } diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 4a3db2e8497..909da5bebba 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -156,9 +156,11 @@ struct SelectQueryInfo TreeRewriterResultPtr syntax_analyzer_result; /// This is an additional filer applied to current table. - /// It is needed only for additional PK filtering. ASTPtr additional_filter_ast; + /// It is needed for PK analysis based on row_level_policy and additional_filters. + ASTs filter_asts; + ReadInOrderOptimizerPtr order_optimizer; /// Can be modified while reading from storage InputOrderInfoPtr input_order_info; diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 666717e50a0..2bedf406b7d 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -542,7 +542,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( return {}; if (auto * read_from_merge_tree = typeid_cast(plan.getRootNode()->step.get())) - read_from_merge_tree->addFilter(added_filter, added_filter_column_name); + read_from_merge_tree->addFilterNodes(added_filter_nodes); builder = plan.buildQueryPipeline( QueryPlanOptimizationSettings::fromContext(modified_context), diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index d2f94ac6b88..6bf68660803 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -140,8 +140,8 @@ public: void addFilter(ActionsDAGPtr expression, std::string column_name) { - added_filter = std::move(expression); - added_filter_column_name = std::move(column_name); + added_filter_dags.push_back(expression); + added_filter_nodes.nodes.push_back(&expression->findInOutputs(column_name)); } private: @@ -160,7 +160,9 @@ private: ContextMutablePtr context; QueryProcessingStage::Enum common_processed_stage; - ActionsDAGPtr added_filter; + std::vector added_filter_dags; + ActionDAGNodes added_filter_nodes; + std::string added_filter_column_name; struct AliasData diff --git a/tests/queries/0_stateless/01710_projection_additional_filters.reference b/tests/queries/0_stateless/01710_projection_additional_filters.reference new file mode 100644 index 00000000000..06b63ea6c2f --- /dev/null +++ b/tests/queries/0_stateless/01710_projection_additional_filters.reference @@ -0,0 +1 @@ +0 0 0 diff --git a/tests/queries/0_stateless/01710_projection_additional_filters.sql b/tests/queries/0_stateless/01710_projection_additional_filters.sql new file mode 100644 index 00000000000..1633b48ba7e --- /dev/null +++ b/tests/queries/0_stateless/01710_projection_additional_filters.sql @@ -0,0 +1,9 @@ +DROP TABLE IF EXISTS t; + +CREATE TABLE t(a UInt32, b UInt32) ENGINE = MergeTree PARTITION BY a ORDER BY a; + +INSERT INTO t SELECT number % 10, number FROM numbers(10000); + +SELECT count(), min(a), max(a) FROM t SETTINGS additional_table_filters = {'t' : '0'}; + +DROP TABLE t; diff --git a/tests/queries/0_stateless/01710_projection_row_policy.reference b/tests/queries/0_stateless/01710_projection_row_policy.reference new file mode 100644 index 00000000000..06b63ea6c2f --- /dev/null +++ b/tests/queries/0_stateless/01710_projection_row_policy.reference @@ -0,0 +1 @@ +0 0 0 diff --git a/tests/queries/0_stateless/01710_projection_row_policy.sql b/tests/queries/0_stateless/01710_projection_row_policy.sql new file mode 100644 index 00000000000..a54cc50b9e9 --- /dev/null +++ b/tests/queries/0_stateless/01710_projection_row_policy.sql @@ -0,0 +1,13 @@ +DROP TABLE IF EXISTS t; + +CREATE TABLE t(a UInt32, b UInt32) ENGINE = MergeTree PARTITION BY a ORDER BY a; + +INSERT INTO t SELECT number % 10, number FROM numbers(10000); + +CREATE ROW POLICY OR REPLACE rp ON t FOR SELECT USING 0 TO ALL; + +SELECT count(), min(a), max(a) FROM t; + +DROP ROW POLICY rp ON t; + +DROP TABLE t;