Merge pull request #60196 from azat/preliminary-filters-fix

Fix actions execution during preliminary filtering (PK, partition pruning)
This commit is contained in:
Julia Kartseva 2024-02-29 10:17:05 -08:00 committed by GitHub
commit 891689a415
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 44 additions and 16 deletions

View File

@ -566,7 +566,7 @@ namespace
};
}
static void executeAction(const ExpressionActions::Action & action, ExecutionContext & execution_context, bool dry_run)
static void executeAction(const ExpressionActions::Action & action, ExecutionContext & execution_context, bool dry_run, bool allow_duplicates_in_input)
{
auto & inputs = execution_context.inputs;
auto & columns = execution_context.columns;
@ -697,14 +697,19 @@ static void executeAction(const ExpressionActions::Action & action, ExecutionCon
action.node->result_name);
}
else
columns[action.result_position] = std::move(inputs[pos]);
{
if (allow_duplicates_in_input)
columns[action.result_position] = inputs[pos];
else
columns[action.result_position] = std::move(inputs[pos]);
}
break;
}
}
}
void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run) const
void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run, bool allow_duplicates_in_input) const
{
ExecutionContext execution_context
{
@ -725,7 +730,8 @@ void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run)
if (execution_context.inputs_pos[input_pos] < 0)
{
execution_context.inputs_pos[input_pos] = pos;
break;
if (!allow_duplicates_in_input)
break;
}
}
}
@ -737,12 +743,8 @@ void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run)
{
try
{
executeAction(action, execution_context, dry_run);
executeAction(action, execution_context, dry_run, allow_duplicates_in_input);
checkLimits(execution_context.columns);
//std::cerr << "Action: " << action.toString() << std::endl;
//for (const auto & col : execution_context.columns)
// std::cerr << col.dumpStructure() << std::endl;
}
catch (Exception & e)
{
@ -755,6 +757,12 @@ void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run)
{
block.clear();
}
else if (allow_duplicates_in_input)
{
/// This case is the same as when the input is projected
/// since we do not need any input columns.
block.clear();
}
else
{
::sort(execution_context.inputs_pos.rbegin(), execution_context.inputs_pos.rend());
@ -777,11 +785,11 @@ void ExpressionActions::execute(Block & block, size_t & num_rows, bool dry_run)
num_rows = execution_context.num_rows;
}
void ExpressionActions::execute(Block & block, bool dry_run) const
void ExpressionActions::execute(Block & block, bool dry_run, bool allow_duplicates_in_input) const
{
size_t num_rows = block.rows();
execute(block, num_rows, dry_run);
execute(block, num_rows, dry_run, allow_duplicates_in_input);
if (!block)
block.insert({DataTypeUInt8().createColumnConst(num_rows, 0), std::make_shared<DataTypeUInt8>(), "_dummy"});

View File

@ -98,9 +98,15 @@ public:
const NamesAndTypesList & getRequiredColumnsWithTypes() const { return required_columns; }
/// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns.
void execute(Block & block, size_t & num_rows, bool dry_run = false) const;
///
/// @param allow_duplicates_in_input - actions are allowed to have
/// duplicated input (that will refer into the block). This is needed for
/// preliminary query filtering (filterBlockWithDAG()), because they just
/// pass available virtual columns, which cannot be moved in case they are
/// used multiple times.
void execute(Block & block, size_t & num_rows, bool dry_run = false, bool allow_duplicates_in_input = false) const;
/// The same, but without `num_rows`. If result block is empty, adds `_dummy` column to keep block size.
void execute(Block & block, bool dry_run = false) const;
void execute(Block & block, bool dry_run = false, bool allow_duplicates_in_input = false) const;
bool hasArrayJoin() const;
void assertDeterministic() const;

View File

@ -223,7 +223,7 @@ bool prepareFilterBlockWithQuery(const ASTPtr & query, ContextPtr context, Block
auto expression_actions = std::make_shared<ExpressionActions>(actions);
auto block_with_constants = block;
expression_actions->execute(block_with_constants);
expression_actions->execute(block_with_constants, /*dry_run=*/ false, /*allow_duplicates_in_input=*/ true);
return block_with_constants.has(expr_column_name) && isColumnConst(*block_with_constants.getByName(expr_column_name).column);
};
@ -266,7 +266,7 @@ void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context)
auto actions = std::make_shared<ExpressionActions>(dag);
makeSets(actions, context);
Block block_with_filter = block;
actions->execute(block_with_filter);
actions->execute(block_with_filter, /*dry_run=*/ false, /*allow_duplicates_in_input=*/ true);
/// Filter the block.
String filter_column_name = dag->getOutputs().at(0)->result_name;
@ -313,7 +313,7 @@ void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr contex
makeSets(actions, context);
Block block_with_filter = block;
actions->execute(block_with_filter);
actions->execute(block_with_filter, /*dry_run=*/ false, /*allow_duplicates_in_input=*/ true);
/// Filter the block.
String filter_column_name = expression_ast->getColumnName();

View File

@ -0,0 +1,6 @@
-- It is special because actions cannot be reused for SimpleAggregateFunction (see https://github.com/ClickHouse/ClickHouse/pull/54436)
drop table if exists data;
create table data (key Int) engine=AggregatingMergeTree() order by tuple();
insert into data values (0);
select * from data final prewhere indexHint(_partition_id = 'all') or indexHint(_partition_id = 'all');
select * from data final prewhere indexHint(_partition_id = 'all') or indexHint(_partition_id = 'all') or indexHint(_partition_id = 'all');

View File

@ -0,0 +1,5 @@
-- It is special because actions cannot be reused for SimpleAggregateFunction (see https://github.com/ClickHouse/ClickHouse/pull/54436)
drop table if exists data;
create table data (key SimpleAggregateFunction(max, Int)) engine=AggregatingMergeTree() order by tuple();
insert into data values (0);
select * from data final prewhere indexHint(_partition_id = 'all') and key >= -1 where key >= 0;