mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 13:10:48 +00:00
Refactor row level security actions.
This commit is contained in:
parent
45e90961f7
commit
ae73600fb0
@ -80,14 +80,14 @@ ActionsDAG::Node & ActionsDAG::getNode(const std::string & name)
|
||||
return **it;
|
||||
}
|
||||
|
||||
const ActionsDAG::Node & ActionsDAG::addInput(std::string name, DataTypePtr type, bool can_replace)
|
||||
const ActionsDAG::Node & ActionsDAG::addInput(std::string name, DataTypePtr type, bool can_replace, bool add_to_index)
|
||||
{
|
||||
Node node;
|
||||
node.type = ActionType::INPUT;
|
||||
node.result_type = std::move(type);
|
||||
node.result_name = std::move(name);
|
||||
|
||||
return addNode(std::move(node), can_replace);
|
||||
return addNode(std::move(node), can_replace, add_to_index);
|
||||
}
|
||||
|
||||
const ActionsDAG::Node & ActionsDAG::addInput(ColumnWithTypeAndName column, bool can_replace)
|
||||
|
@ -196,7 +196,7 @@ public:
|
||||
std::string dumpNames() const;
|
||||
std::string dumpDAG() const;
|
||||
|
||||
const Node & addInput(std::string name, DataTypePtr type, bool can_replace = false);
|
||||
const Node & addInput(std::string name, DataTypePtr type, bool can_replace = false, bool add_to_index = true);
|
||||
const Node & addInput(ColumnWithTypeAndName column, bool can_replace = false);
|
||||
const Node & addColumn(ColumnWithTypeAndName column, bool can_replace = false, bool materialize = false);
|
||||
const Node & addAlias(const std::string & name, std::string alias, bool can_replace = false);
|
||||
@ -220,7 +220,7 @@ public:
|
||||
/// Return true if column was removed from inputs.
|
||||
bool removeUnusedResult(const std::string & column_name);
|
||||
|
||||
void projectInput() { settings.project_input = true; }
|
||||
void projectInput(bool project = true) { settings.project_input = project; }
|
||||
void removeUnusedActions(const Names & required_names);
|
||||
|
||||
bool hasArrayJoin() const;
|
||||
|
@ -855,6 +855,10 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
if (!select_query->prewhere())
|
||||
return prewhere_actions;
|
||||
|
||||
Names first_action_names;
|
||||
if (!chain.steps.empty())
|
||||
first_action_names = chain.steps.front()->getRequiredColumns().getNames();
|
||||
|
||||
auto & step = chain.lastStep(sourceColumns());
|
||||
getRootActions(select_query->prewhere(), only_types, step.actions());
|
||||
String prewhere_column_name = select_query->prewhere()->getColumnName();
|
||||
@ -879,6 +883,7 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
auto tmp_actions = std::make_shared<ExpressionActions>(tmp_actions_dag);
|
||||
auto required_columns = tmp_actions->getRequiredColumns();
|
||||
NameSet required_source_columns(required_columns.begin(), required_columns.end());
|
||||
required_source_columns.insert(first_action_names.begin(), first_action_names.end());
|
||||
|
||||
/// Add required columns to required output in order not to remove them after prewhere execution.
|
||||
/// TODO: add sampling and final execution to common chain.
|
||||
@ -1579,6 +1584,7 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, si
|
||||
{
|
||||
const ExpressionActionsChain::Step & step = *chain.steps.at(next_step_i++);
|
||||
prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0);
|
||||
prewhere_info->prewhere_actions->projectInput(false);
|
||||
|
||||
NameSet columns_to_remove;
|
||||
for (size_t i = 1; i < step.required_output.size(); ++i)
|
||||
|
@ -107,6 +107,10 @@ namespace ErrorCodes
|
||||
String InterpreterSelectQuery::generateFilterActions(
|
||||
ActionsDAGPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns) const
|
||||
{
|
||||
std::cerr << "----- InterpreterSelectQuery::generateFilterActions\n";
|
||||
for (const auto & name : prerequisite_columns)
|
||||
std::cerr << name << std::endl;
|
||||
|
||||
const auto & db_name = table_id.getDatabaseName();
|
||||
const auto & table_name = table_id.getTableName();
|
||||
|
||||
@ -141,6 +145,7 @@ String InterpreterSelectQuery::generateFilterActions(
|
||||
auto syntax_result = TreeRewriter(*context).analyzeSelect(query_ast, TreeRewriterResult({}, storage, metadata_snapshot));
|
||||
SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, *context, metadata_snapshot);
|
||||
actions = analyzer.simpleSelectActions();
|
||||
//std::cerr << actions->
|
||||
|
||||
return expr_list->children.at(0)->getColumnName();
|
||||
}
|
||||
@ -524,6 +529,10 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
|
||||
{
|
||||
executeImpl(query_plan, input, std::move(input_pipe));
|
||||
|
||||
WriteBufferFromOwnString buf;
|
||||
query_plan.explainPlan(buf, {.header = true, .actions = true});
|
||||
std::cerr << buf.str();
|
||||
|
||||
/// We must guarantee that result structure is the same as in getSampleBlock()
|
||||
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
|
||||
{
|
||||
@ -811,20 +820,54 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
bool intermediate_stage = false;
|
||||
bool to_aggregation_stage = false;
|
||||
bool from_aggregation_stage = false;
|
||||
const bool execute_row_level_filter_in_prewhere = (
|
||||
(
|
||||
settings.optimize_move_to_prewhere || // ...when it is allowed to move things to prewhere, so we do it for row-level filter actions too.
|
||||
expressions.prewhere_info // ...or when we already have prewhere and must execute row-level filter before it.
|
||||
) &&
|
||||
!input && !input_pipe && storage && storage->supportsPrewhere() // Check that prewhere can be used at all.
|
||||
);
|
||||
|
||||
if (expressions.filter_info)
|
||||
{
|
||||
if (!expressions.prewhere_info)
|
||||
{
|
||||
const bool does_storage_support_prewhere = !input && !input_pipe && storage && storage->supportsPrewhere();
|
||||
if (does_storage_support_prewhere && settings.optimize_move_to_prewhere)
|
||||
{
|
||||
/// Execute row level filter in prewhere as a part of "move to prewhere" optimization.
|
||||
expressions.prewhere_info = std::make_shared<PrewhereDAGInfo>(
|
||||
std::move(expressions.filter_info->actions),
|
||||
std::move(expressions.filter_info->column_name));
|
||||
expressions.prewhere_info->remove_prewhere_column = expressions.filter_info->do_remove_column;
|
||||
expressions.prewhere_info->need_filter = true;
|
||||
expressions.filter_info = nullptr;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Add row level security actions to prewhere.
|
||||
std::cerr << expressions.filter_info->actions->dumpDAG() << std::endl;
|
||||
expressions.prewhere_info->row_level_filter_actions = std::move(expressions.filter_info->actions);
|
||||
expressions.prewhere_info->row_level_column_name = std::move(expressions.filter_info->column_name);
|
||||
expressions.prewhere_info->row_level_filter_actions->projectInput(false);
|
||||
if (expressions.filter_info->do_remove_column)
|
||||
{
|
||||
/// Instead of removing column, add it to prewhere_actions input (but not in index).
|
||||
/// It will be removed at prewhere_actions execution.
|
||||
const auto & index = expressions.prewhere_info->row_level_filter_actions->getIndex();
|
||||
auto it = index.find(expressions.prewhere_info->row_level_column_name);
|
||||
if (it == index.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found column {} in row level security filter {}",
|
||||
expressions.prewhere_info->row_level_column_name, expressions.prewhere_info->row_level_filter_actions->dumpDAG());
|
||||
const auto & node = *it;
|
||||
|
||||
expressions.prewhere_info->prewhere_actions->addInput(node->result_name, node->result_type, true, false);
|
||||
}
|
||||
|
||||
expressions.filter_info = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (options.only_analyze)
|
||||
{
|
||||
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
|
||||
query_plan.addStep(std::move(read_nothing));
|
||||
|
||||
if (expressions.filter_info && execute_row_level_filter_in_prewhere)
|
||||
if (expressions.filter_info)
|
||||
{
|
||||
auto row_level_security_step = std::make_unique<FilterStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
@ -832,12 +875,24 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
expressions.filter_info->column_name,
|
||||
expressions.filter_info->do_remove_column);
|
||||
|
||||
row_level_security_step->setStepDescription("Row-level security filter (PREWHERE)");
|
||||
row_level_security_step->setStepDescription("Row-level security filter");
|
||||
query_plan.addStep(std::move(row_level_security_step));
|
||||
}
|
||||
|
||||
if (expressions.prewhere_info)
|
||||
{
|
||||
if (expressions.prewhere_info->row_level_filter_actions)
|
||||
{
|
||||
auto row_level_filter_step = std::make_unique<FilterStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
expressions.prewhere_info->row_level_filter_actions,
|
||||
expressions.prewhere_info->row_level_column_name,
|
||||
false);
|
||||
|
||||
row_level_filter_step->setStepDescription("Row-level security filter (PREWHERE)");
|
||||
query_plan.addStep(std::move(row_level_filter_step));
|
||||
}
|
||||
|
||||
auto prewhere_step = std::make_unique<FilterStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
expressions.prewhere_info->prewhere_actions,
|
||||
@ -887,7 +942,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
to_aggregation_stage = true;
|
||||
|
||||
/// Read the data from Storage. from_stage - to what stage the request was completed in Storage.
|
||||
executeFetchColumns(from_stage, query_plan, execute_row_level_filter_in_prewhere);
|
||||
executeFetchColumns(from_stage, query_plan);
|
||||
|
||||
LOG_TRACE(log, "{} -> {}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage));
|
||||
}
|
||||
@ -952,7 +1007,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
|
||||
if (expressions.first_stage)
|
||||
{
|
||||
if (expressions.filter_info && !execute_row_level_filter_in_prewhere)
|
||||
if (expressions.filter_info)
|
||||
{
|
||||
auto row_level_security_step = std::make_unique<FilterStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
@ -1211,30 +1266,6 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
|
||||
{
|
||||
auto & prewhere_info = *query_info.prewhere_info;
|
||||
|
||||
if (prewhere_info.filter_info)
|
||||
{
|
||||
auto & filter_info = *prewhere_info.filter_info;
|
||||
|
||||
if (filter_info.alias_actions)
|
||||
{
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(
|
||||
header,
|
||||
filter_info.alias_actions);
|
||||
});
|
||||
}
|
||||
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
header,
|
||||
filter_info.actions,
|
||||
filter_info.column_name,
|
||||
filter_info.do_remove_column);
|
||||
});
|
||||
}
|
||||
|
||||
if (prewhere_info.alias_actions)
|
||||
{
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
@ -1245,6 +1276,18 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
|
||||
});
|
||||
}
|
||||
|
||||
if (prewhere_info.row_level_filter)
|
||||
{
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
header,
|
||||
prewhere_info.row_level_filter,
|
||||
prewhere_info.row_level_column_name,
|
||||
false);
|
||||
});
|
||||
}
|
||||
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
@ -1274,7 +1317,7 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
|
||||
query_plan.addStep(std::move(read_from_pipe));
|
||||
}
|
||||
|
||||
void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan, bool execute_row_level_filter_in_prewhere)
|
||||
void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan)
|
||||
{
|
||||
auto & query = getSelectQuery();
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
@ -1351,13 +1394,15 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
if (storage)
|
||||
{
|
||||
/// Append columns from the table filter to required
|
||||
auto row_policy_filter = context->getRowPolicyCondition(table_id.getDatabaseName(), table_id.getTableName(), RowPolicy::SELECT_FILTER);
|
||||
if (row_policy_filter)
|
||||
ActionsDAG * row_policy_filter = nullptr;
|
||||
if (expressions.filter_info)
|
||||
row_policy_filter = expressions.filter_info->actions.get();
|
||||
// else if (expressions.prewhere_info && expressions.prewhere_info->row_level_filter_actions)
|
||||
// row_policy_filter = expressions.prewhere_info->row_level_filter_actions.get();
|
||||
|
||||
if (expressions.filter_info)
|
||||
{
|
||||
auto initial_required_columns = required_columns;
|
||||
ActionsDAGPtr actions_dag;
|
||||
generateFilterActions(actions_dag, row_policy_filter, initial_required_columns);
|
||||
auto required_columns_from_filter = actions_dag->getRequiredColumns();
|
||||
auto required_columns_from_filter = expressions.filter_info->actions->getRequiredColumns();
|
||||
|
||||
for (const auto & column : required_columns_from_filter)
|
||||
{
|
||||
@ -1394,7 +1439,10 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
if (prewhere_info)
|
||||
{
|
||||
/// Get some columns directly from PREWHERE expression actions
|
||||
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
|
||||
auto prewhere_required_columns = (
|
||||
prewhere_info->row_level_filter_actions ?
|
||||
prewhere_info->row_level_filter_actions :
|
||||
prewhere_info->prewhere_actions)->getRequiredColumns().getNames();
|
||||
required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
|
||||
}
|
||||
|
||||
@ -1605,31 +1653,18 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
{
|
||||
query_info.prewhere_info = std::make_shared<PrewhereInfo>();
|
||||
|
||||
if (expressions.filter_info && execute_row_level_filter_in_prewhere)
|
||||
{
|
||||
query_info.prewhere_info->filter_info = std::make_shared<FilterInfo>();
|
||||
|
||||
if (alias_actions)
|
||||
query_info.prewhere_info->filter_info->alias_actions = std::make_shared<ExpressionActions>(std::move(alias_actions));
|
||||
|
||||
if (expressions.filter_info->actions)
|
||||
query_info.prewhere_info->filter_info->actions = std::make_shared<ExpressionActions>(expressions.filter_info->actions);
|
||||
|
||||
query_info.prewhere_info->filter_info->column_name = expressions.filter_info->column_name;
|
||||
query_info.prewhere_info->filter_info->do_remove_column = expressions.filter_info->do_remove_column;
|
||||
}
|
||||
query_info.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
|
||||
|
||||
if (prewhere_info->row_level_filter_actions)
|
||||
query_info.prewhere_info->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter_actions);
|
||||
if (prewhere_info->alias_actions)
|
||||
query_info.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions);
|
||||
|
||||
if (prewhere_info->prewhere_actions)
|
||||
query_info.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
|
||||
|
||||
if (prewhere_info->remove_columns_actions)
|
||||
query_info.prewhere_info->remove_columns_actions = std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions);
|
||||
|
||||
query_info.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
|
||||
query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
|
||||
query_info.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
|
||||
query_info.prewhere_info->need_filter = prewhere_info->need_filter;
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,7 @@ private:
|
||||
|
||||
/// Different stages of query execution.
|
||||
|
||||
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan, bool execute_row_level_filter_in_prewhere);
|
||||
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan);
|
||||
void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
|
||||
void executeAggregation(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
|
||||
void executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final);
|
||||
|
@ -46,16 +46,8 @@ Block getHeaderForProcessingStage(
|
||||
{
|
||||
auto & prewhere_info = *query_info.prewhere_info;
|
||||
|
||||
if (prewhere_info.filter_info)
|
||||
{
|
||||
auto & filter_info = *prewhere_info.filter_info;
|
||||
|
||||
if (filter_info.actions)
|
||||
filter_info.actions->execute(header);
|
||||
|
||||
if (filter_info.do_remove_column)
|
||||
header.erase(filter_info.column_name);
|
||||
}
|
||||
if (prewhere_info.row_level_filter)
|
||||
prewhere_info.row_level_filter->execute(header);
|
||||
|
||||
if (prewhere_info.prewhere_actions)
|
||||
prewhere_info.prewhere_actions->execute(header);
|
||||
|
@ -334,38 +334,30 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
|
||||
{
|
||||
if (prewhere_info)
|
||||
{
|
||||
if (prewhere_info->filter_info)
|
||||
{
|
||||
auto & filter_info = *prewhere_info->filter_info;
|
||||
|
||||
if (filter_info.alias_actions)
|
||||
filter_info.alias_actions->execute(block);
|
||||
|
||||
if (filter_info.actions)
|
||||
filter_info.actions->execute(block);
|
||||
|
||||
auto & filter_column = block.getByName(filter_info.column_name);
|
||||
if (!filter_column.type->canBeUsedInBooleanContext())
|
||||
{
|
||||
throw Exception("Invalid type for row-level security filter: " + filter_column.type->getName(),
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
if (filter_info.do_remove_column)
|
||||
block.erase(filter_info.column_name);
|
||||
else
|
||||
{
|
||||
auto & ctn = block.getByName(filter_info.column_name);
|
||||
ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
|
||||
}
|
||||
}
|
||||
std::cerr << "0: " << block.dumpStructure() << std::endl;
|
||||
|
||||
if (prewhere_info->alias_actions)
|
||||
prewhere_info->alias_actions->execute(block);
|
||||
|
||||
std::cerr << "1: " << block.dumpStructure() << std::endl;
|
||||
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
prewhere_info->row_level_filter->execute(block);
|
||||
auto & row_level_column = block.getByName(prewhere_info->row_level_column_name);
|
||||
if (!row_level_column.type->canBeUsedInBooleanContext())
|
||||
{
|
||||
throw Exception("Invalid type for filter in PREWHERE: " + row_level_column.type->getName(),
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
std::cerr << "2: " << block.dumpStructure() << std::endl;
|
||||
|
||||
if (prewhere_info->prewhere_actions)
|
||||
prewhere_info->prewhere_actions->execute(block);
|
||||
|
||||
std::cerr << "3: " << block.dumpStructure() << std::endl;
|
||||
|
||||
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
|
||||
if (!prewhere_column.type->canBeUsedInBooleanContext())
|
||||
{
|
||||
@ -380,6 +372,8 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
|
||||
auto & ctn = block.getByName(prewhere_info->prewhere_column_name);
|
||||
ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
|
||||
}
|
||||
|
||||
std::cerr << "4: " << block.dumpStructure() << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,30 +269,12 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
if (prewhere_info->filter_info)
|
||||
{
|
||||
if (prewhere_info->filter_info->alias_actions)
|
||||
{
|
||||
const auto required_column_names = prewhere_info->filter_info->alias_actions->getRequiredColumns();
|
||||
pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end());
|
||||
}
|
||||
else if (prewhere_info->filter_info->actions)
|
||||
{
|
||||
const auto required_column_names = prewhere_info->filter_info->actions->getRequiredColumns();
|
||||
pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end());
|
||||
}
|
||||
}
|
||||
|
||||
if (prewhere_info->alias_actions)
|
||||
{
|
||||
const auto required_column_names = prewhere_info->alias_actions->getRequiredColumns();
|
||||
pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end());
|
||||
}
|
||||
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
|
||||
else if (prewhere_info->row_level_filter)
|
||||
pre_column_names = prewhere_info->row_level_filter->getRequiredColumns();
|
||||
else if (prewhere_info->prewhere_actions)
|
||||
{
|
||||
const auto required_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
|
||||
pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end());
|
||||
}
|
||||
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
|
||||
|
||||
if (pre_column_names.empty())
|
||||
pre_column_names.push_back(column_names[0]);
|
||||
|
@ -561,18 +561,12 @@ MergeTreeRangeReader::MergeTreeRangeReader(
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
if (prewhere_info->filter_info)
|
||||
{
|
||||
if (prewhere_info->filter_info->actions)
|
||||
prewhere_info->filter_info->actions->execute(sample_block, true);
|
||||
|
||||
if (prewhere_info->filter_info->do_remove_column)
|
||||
sample_block.erase(prewhere_info->filter_info->column_name);
|
||||
}
|
||||
|
||||
if (prewhere_info->alias_actions)
|
||||
prewhere_info->alias_actions->execute(sample_block, true);
|
||||
|
||||
if (prewhere_info->row_level_filter)
|
||||
prewhere_info->row_level_filter->execute(sample_block, true);
|
||||
|
||||
if (prewhere_info->prewhere_actions)
|
||||
prewhere_info->prewhere_actions->execute(sample_block, true);
|
||||
|
||||
@ -897,31 +891,20 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
|
||||
for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
|
||||
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
|
||||
|
||||
if (prewhere_info->filter_info)
|
||||
{
|
||||
if (prewhere_info->filter_info->alias_actions)
|
||||
prewhere_info->filter_info->alias_actions->execute(block);
|
||||
|
||||
if (prewhere_info->filter_info->actions)
|
||||
prewhere_info->filter_info->actions->execute(block);
|
||||
|
||||
const auto filter_column_pos = block.getPositionByName(prewhere_info->filter_info->column_name);
|
||||
result.addFilter(block.getByPosition(filter_column_pos).column);
|
||||
|
||||
if (prewhere_info->filter_info->do_remove_column)
|
||||
block.erase(prewhere_info->filter_info->column_name);
|
||||
else
|
||||
block.getByPosition(filter_column_pos).column = block.getByPosition(filter_column_pos).type->createColumnConst(result.num_rows, 1);
|
||||
}
|
||||
|
||||
if (prewhere_info->alias_actions)
|
||||
prewhere_info->alias_actions->execute(block);
|
||||
|
||||
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
|
||||
result.block_before_prewhere = block;
|
||||
|
||||
if (prewhere_info->prewhere_actions)
|
||||
prewhere_info->prewhere_actions->execute(block);
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
prewhere_info->row_level_filter->execute(block);
|
||||
const auto filter_column_pos = block.getPositionByName(prewhere_info->row_level_column_name);
|
||||
result.addFilter(block.getByPosition(filter_column_pos).column);
|
||||
}
|
||||
|
||||
prewhere_info->prewhere_actions->execute(block);
|
||||
|
||||
prewhere_column_pos = block.getPositionByName(prewhere_info->prewhere_column_name);
|
||||
result.addFilter(block.getByPosition(prewhere_column_pos).column);
|
||||
@ -943,7 +926,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
|
||||
if (result.totalRowsPerGranule() == 0)
|
||||
result.setFilterConstFalse();
|
||||
/// If we need to filter in PREWHERE
|
||||
else if (prewhere_info->need_filter || result.need_filter || prewhere_info->remove_prewhere_column)
|
||||
else if (prewhere_info->need_filter || result.need_filter || prewhere_info->row_level_filter)
|
||||
{
|
||||
/// If there is a filter and without optimized
|
||||
if (result.getFilter() && last_reader_in_chain)
|
||||
|
@ -41,14 +41,16 @@ using ClusterPtr = std::shared_ptr<Cluster>;
|
||||
|
||||
struct PrewhereInfo
|
||||
{
|
||||
/// Information about the preliminary filter expression, if any.
|
||||
FilterInfoPtr filter_info;
|
||||
/// Actions which are executed in order to alias columns are used for prewhere actions.
|
||||
ExpressionActionsPtr alias_actions;
|
||||
/// Actions for row level security filter. Applied separately before prewhere_actions.
|
||||
/// This actions are separate because prewhere condition should not be executed over filtered rows.
|
||||
ExpressionActionsPtr row_level_filter;
|
||||
/// Actions which are executed on block in order to get filter column for prewhere step.
|
||||
ExpressionActionsPtr prewhere_actions;
|
||||
/// Actions which are executed after reading from storage in order to remove unused columns.
|
||||
ExpressionActionsPtr remove_columns_actions;
|
||||
String row_level_column_name;
|
||||
String prewhere_column_name;
|
||||
bool remove_prewhere_column = false;
|
||||
bool need_filter = false;
|
||||
@ -58,8 +60,10 @@ struct PrewhereInfo
|
||||
struct PrewhereDAGInfo
|
||||
{
|
||||
ActionsDAGPtr alias_actions;
|
||||
ActionsDAGPtr row_level_filter_actions;
|
||||
ActionsDAGPtr prewhere_actions;
|
||||
ActionsDAGPtr remove_columns_actions;
|
||||
String row_level_column_name;
|
||||
String prewhere_column_name;
|
||||
bool remove_prewhere_column = false;
|
||||
bool need_filter = false;
|
||||
|
@ -321,28 +321,6 @@ void StorageBuffer::read(
|
||||
{
|
||||
if (query_info.prewhere_info)
|
||||
{
|
||||
if (query_info.prewhere_info->filter_info)
|
||||
{
|
||||
if (query_info.prewhere_info->filter_info->alias_actions)
|
||||
{
|
||||
pipe_from_buffers.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(
|
||||
header,
|
||||
query_info.prewhere_info->filter_info->alias_actions);
|
||||
});
|
||||
}
|
||||
|
||||
pipe_from_buffers.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
header,
|
||||
query_info.prewhere_info->filter_info->actions,
|
||||
query_info.prewhere_info->filter_info->column_name,
|
||||
query_info.prewhere_info->filter_info->do_remove_column);
|
||||
});
|
||||
}
|
||||
|
||||
if (query_info.prewhere_info->alias_actions)
|
||||
{
|
||||
pipe_from_buffers.addSimpleTransform([&](const Block & header)
|
||||
@ -353,6 +331,18 @@ void StorageBuffer::read(
|
||||
});
|
||||
}
|
||||
|
||||
if (query_info.prewhere_info->row_level_filter)
|
||||
{
|
||||
pipe_from_buffers.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
header,
|
||||
query_info.prewhere_info->row_level_filter,
|
||||
query_info.prewhere_info->row_level_column_name,
|
||||
false);
|
||||
});
|
||||
}
|
||||
|
||||
pipe_from_buffers.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
|
Loading…
Reference in New Issue
Block a user