Fix KeyCondition with other filters

This commit is contained in:
Amos Bird 2022-07-28 17:40:09 +08:00
parent 2f77266962
commit fa8fab2e8f
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
19 changed files with 153 additions and 73 deletions

View File

@ -608,11 +608,15 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
{
query_info.filter_asts.clear();
/// Fix source_header for filter actions.
if (row_policy_filter)
{
filter_info = generateFilterActions(
table_id, row_policy_filter, context, storage, storage_snapshot, metadata_snapshot, required_columns);
query_info.filter_asts.push_back(row_policy_filter);
}
if (query_info.additional_filter_ast)
@ -621,6 +625,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
table_id, query_info.additional_filter_ast, context, storage, storage_snapshot, metadata_snapshot, required_columns);
additional_filter_info->do_remove_column = true;
query_info.filter_asts.push_back(query_info.additional_filter_ast);
}
source_header = storage_snapshot->getSampleBlockForColumns(required_columns);
@ -2002,8 +2008,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
&& storage
&& storage->getName() != "MaterializedMySQL"
&& !storage->hasLightweightDeletedMask()
&& !row_policy_filter
&& !query_info.additional_filter_ast
&& query_info.filter_asts.empty()
&& processing_stage == QueryProcessingStage::FetchColumns
&& query_analyzer->hasAggregation()
&& (query_analyzer->aggregates().size() == 1)
@ -2103,7 +2108,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
&& !query.limit_with_ties
&& !query.prewhere()
&& !query.where()
&& !query_info.additional_filter_ast
&& query_info.filter_asts.empty()
&& !query.groupBy()
&& !query.having()
&& !query.orderBy()

View File

@ -127,6 +127,8 @@ public:
/// It will set shard_num and shard_count to the client_info
void setProperClientInfo(size_t replica_num, size_t replica_count);
FilterDAGInfoPtr getAdditionalQueryInfo() const { return additional_filter_info; }
static SortDescription getSortDescription(const ASTSelectQuery & query, const ContextPtr & context);
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const ContextPtr & context);

View File

@ -1,9 +1,10 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/StorageMerge.h>
#include <Interpreters/ActionsDAG.h>
#include <stack>
#include <deque>
namespace DB::QueryPlanOptimizations
{
@ -16,33 +17,41 @@ void optimizePrimaryKeyCondition(QueryPlan::Node & root)
size_t next_child = 0;
};
std::stack<Frame> stack;
stack.push({.node = &root});
std::deque<Frame> stack;
stack.push_back({.node = &root});
while (!stack.empty())
{
auto & frame = stack.top();
auto & frame = stack.back();
/// Traverse all children first.
if (frame.next_child < frame.node->children.size())
{
stack.push({.node = frame.node->children[frame.next_child]});
stack.push_back({.node = frame.node->children[frame.next_child]});
++frame.next_child;
continue;
}
if (auto * filter_step = typeid_cast<FilterStep *>(frame.node->step.get()))
auto add_filter = [&](auto & storage)
{
auto * child = frame.node->children.at(0);
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(child->step.get()))
read_from_merge_tree->addFilter(filter_step->getExpression(), filter_step->getFilterColumnName());
for (auto iter=stack.rbegin() + 1; iter!=stack.rend(); ++iter)
{
if (auto * filter_step = typeid_cast<FilterStep *>(iter->node->step.get()))
storage.addFilter(filter_step->getExpression(), filter_step->getFilterColumnName());
else if (typeid_cast<ExpressionStep *>(iter->node->step.get()))
;
else
break;
}
};
if (auto * read_from_merge = typeid_cast<ReadFromMerge *>(child->step.get()))
read_from_merge->addFilter(filter_step->getExpression(), filter_step->getFilterColumnName());
}
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(frame.node->step.get()))
add_filter(*read_from_merge_tree);
else if (auto * read_from_merge = typeid_cast<ReadFromMerge *>(frame.node->step.get()))
add_filter(*read_from_merge);
stack.pop();
stack.pop_back();
}
}

View File

@ -835,8 +835,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge
return selectRangesToRead(
std::move(parts),
prewhere_info,
added_filter,
added_filter_column_name,
added_filter_nodes,
storage_snapshot->metadata,
storage_snapshot->getMetadataForQuery(),
query_info,
@ -852,8 +851,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const ActionDAGNodes & added_filter_nodes,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -895,17 +893,23 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
ActionDAGNodes nodes;
if (prewhere_info)
{
const auto & node = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name);
nodes.nodes.push_back(&node);
{
const auto & node = prewhere_info->prewhere_actions->findInOutputs(prewhere_info->prewhere_column_name);
nodes.nodes.push_back(&node);
}
if (prewhere_info->row_level_filter)
{
const auto & node = prewhere_info->row_level_filter->findInOutputs(prewhere_info->row_level_column_name);
nodes.nodes.push_back(&node);
}
}
if (added_filter)
{
const auto & node = added_filter->findInOutputs(added_filter_column_name);
nodes.nodes.push_back(&node);
}
for (const auto & node : added_filter_nodes.nodes)
nodes.nodes.push_back(node);
key_condition.emplace(std::move(nodes), query_info.syntax_analyzer_result, query_info.prepared_sets, context, primary_key_columns, primary_key.expression);
key_condition.emplace(
std::move(nodes), query_info.syntax_analyzer_result, query_info.prepared_sets, context, primary_key_columns, primary_key.expression);
}
else
{

View File

@ -116,8 +116,14 @@ public:
void addFilter(ActionsDAGPtr expression, std::string column_name)
{
added_filter = std::move(expression);
added_filter_column_name = std::move(column_name);
added_filter_dags.push_back(expression);
added_filter_nodes.nodes.push_back(&expression->findInOutputs(column_name));
}
void addFilterNodes(const ActionDAGNodes & filter_nodes)
{
for (const auto & node : filter_nodes.nodes)
added_filter_nodes.nodes.push_back(node);
}
StorageID getStorageID() const { return data.getStorageID(); }
@ -128,8 +134,7 @@ public:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const ActionDAGNodes & added_filter_nodes,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -160,8 +165,8 @@ private:
PrewhereInfoPtr prewhere_info;
ExpressionActionsSettings actions_settings;
ActionsDAGPtr added_filter;
std::string added_filter_column_name;
std::vector<ActionsDAGPtr> added_filter_dags;
ActionDAGNodes added_filter_nodes;
StorageSnapshotPtr storage_snapshot;
StorageMetadataPtr metadata_for_reading;

View File

@ -854,6 +854,7 @@ static NameSet getAllSubexpressionNames(const ExpressionActions & key_expr)
KeyCondition::KeyCondition(
const ASTPtr & query,
const ASTs & additional_filter_asts,
TreeRewriterResultPtr syntax_analyzer_result,
PreparedSetsPtr prepared_sets_,
ContextPtr context,
@ -883,13 +884,35 @@ KeyCondition::KeyCondition(
array_joined_columns.insert(name);
const ASTSelectQuery & select = query->as<ASTSelectQuery &>();
if (select.where() || select.prewhere())
ASTs filters;
if (select.where())
filters.push_back(select.where());
if (select.prewhere())
filters.push_back(select.prewhere());
for (const auto & filter_ast : additional_filter_asts)
filters.push_back(filter_ast);
if (!filters.empty())
{
ASTPtr filter_query;
if (select.where() && select.prewhere())
filter_query = makeASTFunction("and", select.where(), select.prewhere());
if (filters.size() == 1)
{
filter_query = filters.front();
}
else
filter_query = select.where() ? select.where() : select.prewhere();
{
auto function = std::make_shared<ASTFunction>();
function->name = "and";
function->arguments = std::make_shared<ASTExpressionList>();
function->children.push_back(function->arguments);
function->arguments->children = std::move(filters);
filter_query = function;
}
/** When non-strictly monotonic functions are employed in functional index (e.g. ORDER BY toStartOfHour(dateTime)),
* the use of NOT operator in predicate will result in the indexing algorithm leave out some data.

View File

@ -208,6 +208,7 @@ public:
/// Does not take into account the SAMPLE section. all_columns - the set of all columns of the table.
KeyCondition(
const ASTPtr & query,
const ASTs & additional_filter_asts,
TreeRewriterResultPtr syntax_analyzer_result,
PreparedSetsPtr prepared_sets_,
ContextPtr context,
@ -223,9 +224,18 @@ public:
const ExpressionActionsPtr & key_expr_,
bool single_point_ = false,
bool strict_ = false)
: KeyCondition(query_info.query, query_info.syntax_analyzer_result, query_info.prepared_sets,
context, key_column_names, key_expr_, single_point_, strict_)
{}
: KeyCondition(
query_info.query,
query_info.filter_asts,
query_info.syntax_analyzer_result,
query_info.prepared_sets,
context,
key_column_names,
key_expr_,
single_point_,
strict_)
{
}
KeyCondition(
ActionDAGNodes dag_nodes,

View File

@ -5142,8 +5142,7 @@ static void selectBestProjection(
const MergeTreeDataSelectExecutor & reader,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const ActionDAGNodes & added_filter_nodes,
const Names & required_columns,
ProjectionCandidate & candidate,
ContextPtr query_context,
@ -5174,8 +5173,7 @@ static void selectBestProjection(
storage_snapshot->metadata,
candidate.desc->metadata,
query_info,
added_filter,
added_filter_column_name,
added_filter_nodes,
query_context,
settings.max_threads,
max_added_blocks);
@ -5198,8 +5196,7 @@ static void selectBestProjection(
storage_snapshot->metadata,
storage_snapshot->metadata,
query_info, // TODO syntax_analysis_result set in index
added_filter,
added_filter_column_name,
added_filter_nodes,
query_context,
settings.max_threads,
max_added_blocks);
@ -5524,6 +5521,14 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
const auto & before_where = analysis_result.before_where;
const auto & where_column_name = analysis_result.where_column_name;
/// For PK analysis
ActionDAGNodes added_filter_nodes;
if (auto additional_filter_info = select.getAdditionalQueryInfo())
added_filter_nodes.nodes.push_back(&additional_filter_info->actions->findInOutputs(additional_filter_info->column_name));
if (before_where)
added_filter_nodes.nodes.push_back(&before_where->findInOutputs(where_column_name));
bool can_use_aggregate_projection = true;
/// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage,
/// we cannot use aggregate projection.
@ -5750,7 +5755,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
query_info.minmax_count_projection_block = getMinMaxCountProjectionBlock(
metadata_snapshot,
minmax_count_projection_candidate->required_columns,
analysis_result.prewhere_info || analysis_result.before_where,
!query_info.filter_asts.empty() || analysis_result.prewhere_info || analysis_result.before_where,
query_info,
parts,
normal_parts,
@ -5792,8 +5797,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
metadata_snapshot,
metadata_snapshot,
query_info,
before_where,
where_column_name,
added_filter_nodes,
query_context,
settings.max_threads,
max_added_blocks);
@ -5825,8 +5829,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
metadata_snapshot,
metadata_snapshot,
query_info,
before_where,
where_column_name,
added_filter_nodes,
query_context,
settings.max_threads,
max_added_blocks);
@ -5852,8 +5855,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
reader,
storage_snapshot,
query_info,
before_where,
where_column_name,
added_filter_nodes,
analysis_result.required_columns,
candidate,
query_context,
@ -5874,8 +5876,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
reader,
storage_snapshot,
query_info,
before_where,
where_column_name,
added_filter_nodes,
analysis_result.required_columns,
candidate,
query_context,

View File

@ -1273,8 +1273,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const ActionDAGNodes & added_filter_nodes,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
@ -1295,8 +1294,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
query_info.prewhere_info,
added_filter,
added_filter_column_name,
added_filter_nodes,
metadata_snapshot_base,
metadata_snapshot,
query_info,

View File

@ -60,8 +60,7 @@ public:
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const ActionDAGNodes & added_filter_nodes,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;

View File

@ -157,9 +157,7 @@ void MergeTreeIndexAggregatorMinMax::update(const Block & block, size_t * pos, s
MergeTreeIndexConditionMinMax::MergeTreeIndexConditionMinMax(
const IndexDescription & index,
const SelectQueryInfo & query,
ContextPtr context)
const IndexDescription & index, const SelectQueryInfo & query, ContextPtr context)
: index_data_types(index.data_types)
, condition(query, context, index.column_names, index.expression)
{

View File

@ -26,9 +26,7 @@ private:
public:
PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
: partition_key(MergeTreePartition::adjustPartitionKey(metadata, context))
, partition_condition(
query_info.query, query_info.syntax_analyzer_result, query_info.prepared_sets,
context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
, partition_condition(query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
, useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue())
{
}

View File

@ -156,9 +156,11 @@ struct SelectQueryInfo
TreeRewriterResultPtr syntax_analyzer_result;
/// This is an additional filer applied to current table.
/// It is needed only for additional PK filtering.
ASTPtr additional_filter_ast;
/// It is needed for PK analysis based on row_level_policy and additional_filters.
ASTs filter_asts;
ReadInOrderOptimizerPtr order_optimizer;
/// Can be modified while reading from storage
InputOrderInfoPtr input_order_info;

View File

@ -542,7 +542,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
return {};
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(plan.getRootNode()->step.get()))
read_from_merge_tree->addFilter(added_filter, added_filter_column_name);
read_from_merge_tree->addFilterNodes(added_filter_nodes);
builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(modified_context),

View File

@ -140,8 +140,8 @@ public:
void addFilter(ActionsDAGPtr expression, std::string column_name)
{
added_filter = std::move(expression);
added_filter_column_name = std::move(column_name);
added_filter_dags.push_back(expression);
added_filter_nodes.nodes.push_back(&expression->findInOutputs(column_name));
}
private:
@ -160,7 +160,9 @@ private:
ContextMutablePtr context;
QueryProcessingStage::Enum common_processed_stage;
ActionsDAGPtr added_filter;
std::vector<ActionsDAGPtr> added_filter_dags;
ActionDAGNodes added_filter_nodes;
std::string added_filter_column_name;
struct AliasData

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t(a UInt32, b UInt32) ENGINE = MergeTree PARTITION BY a ORDER BY a;
INSERT INTO t SELECT number % 10, number FROM numbers(10000);
SELECT count(), min(a), max(a) FROM t SETTINGS additional_table_filters = {'t' : '0'};
DROP TABLE t;

View File

@ -0,0 +1 @@
0 0 0

View File

@ -0,0 +1,13 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t(a UInt32, b UInt32) ENGINE = MergeTree PARTITION BY a ORDER BY a;
INSERT INTO t SELECT number % 10, number FROM numbers(10000);
CREATE ROW POLICY OR REPLACE rp ON t FOR SELECT USING 0 TO ALL;
SELECT count(), min(a), max(a) FROM t;
DROP ROW POLICY rp ON t;
DROP TABLE t;