mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #25719 from ClickHouse/remove-PrewhereDAGInfo
Remove PrewhereDAGInfo.
This commit is contained in:
commit
d5bd2b1fa9
@ -1504,7 +1504,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
|
||||
if (auto actions = query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
|
||||
{
|
||||
prewhere_info = std::make_shared<PrewhereDAGInfo>(actions, query.prewhere()->getColumnName(settings));
|
||||
prewhere_info = std::make_shared<PrewhereInfo>(actions, query.prewhere()->getColumnName(settings));
|
||||
|
||||
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
|
||||
{
|
||||
@ -1725,7 +1725,6 @@ void ExpressionAnalysisResult::checkActions() const
|
||||
|
||||
check_actions(prewhere_info->prewhere_actions);
|
||||
check_actions(prewhere_info->alias_actions);
|
||||
check_actions(prewhere_info->remove_columns_actions);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -239,7 +239,7 @@ struct ExpressionAnalysisResult
|
||||
/// Columns will be removed after prewhere actions execution.
|
||||
NameSet columns_to_remove_after_prewhere;
|
||||
|
||||
PrewhereDAGInfoPtr prewhere_info;
|
||||
PrewhereInfoPtr prewhere_info;
|
||||
FilterDAGInfoPtr filter_info;
|
||||
ConstantFilterDescription prewhere_constant_filter_description;
|
||||
ConstantFilterDescription where_constant_filter_description;
|
||||
|
@ -958,11 +958,11 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
|
||||
if (expressions.prewhere_info)
|
||||
{
|
||||
if (expressions.prewhere_info->row_level_filter_actions)
|
||||
if (expressions.prewhere_info->row_level_filter)
|
||||
{
|
||||
auto row_level_filter_step = std::make_unique<FilterStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
expressions.prewhere_info->row_level_filter_actions,
|
||||
expressions.prewhere_info->row_level_filter,
|
||||
expressions.prewhere_info->row_level_column_name,
|
||||
false);
|
||||
|
||||
@ -978,18 +978,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
|
||||
prewhere_step->setStepDescription("PREWHERE");
|
||||
query_plan.addStep(std::move(prewhere_step));
|
||||
|
||||
// To remove additional columns in dry run
|
||||
// For example, sample column which can be removed in this stage
|
||||
// TODO There seems to be no place initializing remove_columns_actions
|
||||
if (expressions.prewhere_info->remove_columns_actions)
|
||||
{
|
||||
auto remove_columns = std::make_unique<ExpressionStep>(
|
||||
query_plan.getCurrentDataStream(), expressions.prewhere_info->remove_columns_actions);
|
||||
|
||||
remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE");
|
||||
query_plan.addStep(std::move(remove_columns));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -1479,33 +1467,29 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
|
||||
|
||||
if (prewhere_info.alias_actions)
|
||||
{
|
||||
pipe.addSimpleTransform(
|
||||
[&](const Block & header) { return std::make_shared<ExpressionTransform>(header, prewhere_info.alias_actions); });
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(header,
|
||||
std::make_shared<ExpressionActions>(prewhere_info.alias_actions));
|
||||
});
|
||||
}
|
||||
|
||||
if (prewhere_info.row_level_filter)
|
||||
{
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(header, prewhere_info.row_level_filter, prewhere_info.row_level_column_name, true);
|
||||
return std::make_shared<FilterTransform>(header,
|
||||
std::make_shared<ExpressionActions>(prewhere_info.row_level_filter),
|
||||
prewhere_info.row_level_column_name, true);
|
||||
});
|
||||
}
|
||||
|
||||
pipe.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
header, prewhere_info.prewhere_actions, prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column);
|
||||
header, std::make_shared<ExpressionActions>(prewhere_info.prewhere_actions),
|
||||
prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column);
|
||||
});
|
||||
|
||||
// To remove additional columns
|
||||
// In some cases, we did not read any marks so that the pipeline.streams is empty
|
||||
// Thus, some columns in prewhere are not removed as expected
|
||||
// This leads to mismatched header in distributed table
|
||||
if (prewhere_info.remove_columns_actions)
|
||||
{
|
||||
pipe.addSimpleTransform(
|
||||
[&](const Block & header) { return std::make_shared<ExpressionTransform>(header, prewhere_info.remove_columns_actions); });
|
||||
}
|
||||
}
|
||||
|
||||
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
|
||||
@ -1560,7 +1544,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
|
||||
if (does_storage_support_prewhere && settings.optimize_move_to_prewhere)
|
||||
{
|
||||
/// Execute row level filter in prewhere as a part of "move to prewhere" optimization.
|
||||
expressions.prewhere_info = std::make_shared<PrewhereDAGInfo>(
|
||||
expressions.prewhere_info = std::make_shared<PrewhereInfo>(
|
||||
std::move(expressions.filter_info->actions),
|
||||
std::move(expressions.filter_info->column_name));
|
||||
expressions.prewhere_info->prewhere_actions->projectInput(false);
|
||||
@ -1572,9 +1556,9 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
|
||||
else
|
||||
{
|
||||
/// Add row level security actions to prewhere.
|
||||
expressions.prewhere_info->row_level_filter_actions = std::move(expressions.filter_info->actions);
|
||||
expressions.prewhere_info->row_level_filter = std::move(expressions.filter_info->actions);
|
||||
expressions.prewhere_info->row_level_column_name = std::move(expressions.filter_info->column_name);
|
||||
expressions.prewhere_info->row_level_filter_actions->projectInput(false);
|
||||
expressions.prewhere_info->row_level_filter->projectInput(false);
|
||||
expressions.filter_info = nullptr;
|
||||
}
|
||||
}
|
||||
@ -1613,9 +1597,9 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
|
||||
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
|
||||
required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
|
||||
|
||||
if (prewhere_info->row_level_filter_actions)
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
auto row_level_required_columns = prewhere_info->row_level_filter_actions->getRequiredColumns().getNames();
|
||||
auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames();
|
||||
required_columns_from_prewhere.insert(row_level_required_columns.begin(), row_level_required_columns.end());
|
||||
}
|
||||
}
|
||||
@ -1898,28 +1882,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
auto & prewhere_info = analysis_result.prewhere_info;
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
auto actions_settings = ExpressionActionsSettings::fromContext(context, CompileExpressions::yes);
|
||||
|
||||
query_info.prewhere_info = std::make_shared<PrewhereInfo>();
|
||||
query_info.prewhere_info->prewhere_actions
|
||||
= std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
|
||||
|
||||
if (prewhere_info->row_level_filter_actions)
|
||||
query_info.prewhere_info->row_level_filter
|
||||
= std::make_shared<ExpressionActions>(prewhere_info->row_level_filter_actions, actions_settings);
|
||||
if (prewhere_info->alias_actions)
|
||||
query_info.prewhere_info->alias_actions
|
||||
= std::make_shared<ExpressionActions>(prewhere_info->alias_actions, actions_settings);
|
||||
if (prewhere_info->remove_columns_actions)
|
||||
query_info.prewhere_info->remove_columns_actions
|
||||
= std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions, actions_settings);
|
||||
|
||||
query_info.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
|
||||
query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
|
||||
query_info.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
|
||||
query_info.prewhere_info->need_filter = prewhere_info->need_filter;
|
||||
}
|
||||
query_info.prewhere_info = prewhere_info;
|
||||
|
||||
/// Create optimizer with prepared actions.
|
||||
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.
|
||||
|
@ -98,12 +98,12 @@ Block getHeaderForProcessingStage(
|
||||
|
||||
if (prewhere_info.row_level_filter)
|
||||
{
|
||||
prewhere_info.row_level_filter->execute(header);
|
||||
header = prewhere_info.row_level_filter->updateHeader(std::move(header));
|
||||
header.erase(prewhere_info.row_level_column_name);
|
||||
}
|
||||
|
||||
if (prewhere_info.prewhere_actions)
|
||||
prewhere_info.prewhere_actions->execute(header);
|
||||
header = prewhere_info.prewhere_actions->updateHeader(std::move(header));
|
||||
|
||||
if (prewhere_info.remove_prewhere_column)
|
||||
header.erase(prewhere_info.prewhere_column_name);
|
||||
|
@ -94,6 +94,7 @@ ReadFromMergeTree::ReadFromMergeTree(
|
||||
, data(data_)
|
||||
, query_info(query_info_)
|
||||
, prewhere_info(getPrewhereInfo(query_info))
|
||||
, actions_settings(ExpressionActionsSettings::fromContext(context_))
|
||||
, metadata_snapshot(std::move(metadata_snapshot_))
|
||||
, metadata_snapshot_base(std::move(metadata_snapshot_base_))
|
||||
, context(std::move(context_))
|
||||
@ -157,7 +158,7 @@ Pipe ReadFromMergeTree::readFromPool(
|
||||
i, pool, min_marks_for_concurrent_read, max_block_size,
|
||||
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
|
||||
data, metadata_snapshot, use_uncompressed_cache,
|
||||
prewhere_info, reader_settings, virt_column_names);
|
||||
prewhere_info, actions_settings, reader_settings, virt_column_names);
|
||||
|
||||
if (i == 0)
|
||||
{
|
||||
@ -180,7 +181,7 @@ ProcessorPtr ReadFromMergeTree::createSource(
|
||||
return std::make_shared<TSource>(
|
||||
data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
|
||||
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
|
||||
prewhere_info, true, reader_settings, virt_column_names, part.part_index_in_query);
|
||||
prewhere_info, actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query);
|
||||
}
|
||||
|
||||
Pipe ReadFromMergeTree::readInOrder(
|
||||
|
@ -90,6 +90,7 @@ private:
|
||||
const MergeTreeData & data;
|
||||
SelectQueryInfo query_info;
|
||||
PrewhereInfoPtr prewhere_info;
|
||||
ExpressionActionsSettings actions_settings;
|
||||
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
StorageMetadataPtr metadata_snapshot_base;
|
||||
|
@ -198,7 +198,7 @@ NameDependencies IStorage::getDependentViewsByColumn(ContextPtr context) const
|
||||
return name_deps;
|
||||
}
|
||||
|
||||
std::string PrewhereDAGInfo::dump() const
|
||||
std::string PrewhereInfo::dump() const
|
||||
{
|
||||
WriteBufferFromOwnString ss;
|
||||
ss << "PrewhereDagInfo\n";
|
||||
@ -213,11 +213,6 @@ std::string PrewhereDAGInfo::dump() const
|
||||
ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n";
|
||||
}
|
||||
|
||||
if (remove_columns_actions)
|
||||
{
|
||||
ss << "remove_columns_actions " << remove_columns_actions->dumpDAG() << "\n";
|
||||
}
|
||||
|
||||
ss << "remove_prewhere_column " << remove_prewhere_column
|
||||
<< ", need_filter " << need_filter << "\n";
|
||||
|
||||
|
@ -27,6 +27,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
ExpressionActionsSettings actions_settings,
|
||||
UInt64 max_block_size_rows_,
|
||||
UInt64 preferred_block_size_bytes_,
|
||||
UInt64 preferred_max_column_in_block_size_bytes_,
|
||||
@ -50,6 +51,23 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
|
||||
for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
|
||||
if (header_without_virtual_columns.has(*it))
|
||||
header_without_virtual_columns.erase(*it);
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
prewhere_actions = std::make_unique<PrewhereExprInfo>();
|
||||
if (prewhere_info->alias_actions)
|
||||
prewhere_actions->alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions, actions_settings);
|
||||
|
||||
if (prewhere_info->row_level_filter)
|
||||
prewhere_actions->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings);
|
||||
|
||||
prewhere_actions->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
|
||||
|
||||
prewhere_actions->row_level_column_name = prewhere_info->row_level_column_name;
|
||||
prewhere_actions->prewhere_column_name = prewhere_info->prewhere_column_name;
|
||||
prewhere_actions->remove_prewhere_column = prewhere_info->remove_prewhere_column;
|
||||
prewhere_actions->need_filter = prewhere_info->need_filter;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -79,14 +97,14 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
|
||||
{
|
||||
if (reader->getColumns().empty())
|
||||
{
|
||||
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true);
|
||||
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true);
|
||||
}
|
||||
else
|
||||
{
|
||||
MergeTreeRangeReader * pre_reader_ptr = nullptr;
|
||||
if (pre_reader != nullptr)
|
||||
{
|
||||
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false);
|
||||
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false);
|
||||
pre_reader_ptr = ¤t_task.pre_range_reader;
|
||||
}
|
||||
|
||||
@ -397,16 +415,17 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns(
|
||||
chunk.setColumns(columns, num_rows);
|
||||
}
|
||||
|
||||
void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
|
||||
Block MergeTreeBaseSelectProcessor::transformHeader(
|
||||
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
|
||||
{
|
||||
if (prewhere_info)
|
||||
{
|
||||
if (prewhere_info->alias_actions)
|
||||
prewhere_info->alias_actions->execute(block);
|
||||
block = prewhere_info->alias_actions->updateHeader(std::move(block));
|
||||
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
prewhere_info->row_level_filter->execute(block);
|
||||
block = prewhere_info->row_level_filter->updateHeader(std::move(block));
|
||||
auto & row_level_column = block.getByName(prewhere_info->row_level_column_name);
|
||||
if (!row_level_column.type->canBeUsedInBooleanContext())
|
||||
{
|
||||
@ -418,7 +437,7 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
|
||||
}
|
||||
|
||||
if (prewhere_info->prewhere_actions)
|
||||
prewhere_info->prewhere_actions->execute(block);
|
||||
block = prewhere_info->prewhere_actions->updateHeader(std::move(block));
|
||||
|
||||
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
|
||||
if (!prewhere_column.type->canBeUsedInBooleanContext())
|
||||
@ -441,12 +460,7 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Block MergeTreeBaseSelectProcessor::transformHeader(
|
||||
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
|
||||
{
|
||||
executePrewhereActions(block, prewhere_info);
|
||||
injectVirtualColumns(block, nullptr, partition_value_type, virtual_columns);
|
||||
return block;
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ namespace DB
|
||||
class IMergeTreeReader;
|
||||
class UncompressedCache;
|
||||
class MarkCache;
|
||||
|
||||
struct PrewhereExprInfo;
|
||||
|
||||
/// Base class for MergeTreeThreadSelectProcessor and MergeTreeSelectProcessor
|
||||
class MergeTreeBaseSelectProcessor : public SourceWithProgress
|
||||
@ -24,6 +24,7 @@ public:
|
||||
const MergeTreeData & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
ExpressionActionsSettings actions_settings,
|
||||
UInt64 max_block_size_rows_,
|
||||
UInt64 preferred_block_size_bytes_,
|
||||
UInt64 preferred_max_column_in_block_size_bytes_,
|
||||
@ -36,8 +37,6 @@ public:
|
||||
static Block transformHeader(
|
||||
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
|
||||
|
||||
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
|
||||
|
||||
protected:
|
||||
Chunk generate() final;
|
||||
|
||||
@ -61,6 +60,7 @@ protected:
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
|
||||
PrewhereInfoPtr prewhere_info;
|
||||
std::unique_ptr<PrewhereExprInfo> prewhere_actions;
|
||||
|
||||
UInt64 max_block_size_rows;
|
||||
UInt64 preferred_block_size_bytes;
|
||||
|
@ -272,16 +272,16 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
||||
if (prewhere_info)
|
||||
{
|
||||
if (prewhere_info->alias_actions)
|
||||
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
|
||||
pre_column_names = prewhere_info->alias_actions->getRequiredColumnsNames();
|
||||
else
|
||||
{
|
||||
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
|
||||
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
|
||||
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
NameSet names(pre_column_names.begin(), pre_column_names.end());
|
||||
|
||||
for (auto & name : prewhere_info->row_level_filter->getRequiredColumns())
|
||||
for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames())
|
||||
{
|
||||
if (names.count(name) == 0)
|
||||
pre_column_names.push_back(name);
|
||||
|
@ -3945,15 +3945,9 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
||||
|
||||
if (analysis_result.prewhere_info)
|
||||
{
|
||||
const auto & prewhere_info = analysis_result.prewhere_info;
|
||||
candidate.prewhere_info = std::make_shared<PrewhereInfo>();
|
||||
candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
|
||||
candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
|
||||
// std::cerr << fmt::format("remove prewhere column : {}", candidate.prewhere_info->remove_prewhere_column) << std::endl;
|
||||
candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
|
||||
candidate.prewhere_info->need_filter = prewhere_info->need_filter;
|
||||
candidate.prewhere_info = analysis_result.prewhere_info;
|
||||
|
||||
auto prewhere_actions = prewhere_info->prewhere_actions->clone();
|
||||
auto prewhere_actions = candidate.prewhere_info->prewhere_actions->clone();
|
||||
auto prewhere_required_columns = required_columns;
|
||||
// required_columns should not contain columns generated by prewhere
|
||||
for (const auto & column : prewhere_actions->getResultColumns())
|
||||
@ -3961,28 +3955,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
||||
// std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl;
|
||||
// Prewhere_action should not add missing keys.
|
||||
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
|
||||
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name, false);
|
||||
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false);
|
||||
// std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl;
|
||||
// std::cerr << fmt::format("prewhere_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl;
|
||||
if (prewhere_required_columns.empty())
|
||||
return false;
|
||||
candidate.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_actions, actions_settings);
|
||||
candidate.prewhere_info->prewhere_actions = prewhere_actions;
|
||||
|
||||
if (prewhere_info->row_level_filter_actions)
|
||||
if (candidate.prewhere_info->row_level_filter)
|
||||
{
|
||||
auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone();
|
||||
auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone();
|
||||
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
|
||||
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name, false);
|
||||
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false);
|
||||
// std::cerr << fmt::format("row_level_filter_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl;
|
||||
if (prewhere_required_columns.empty())
|
||||
return false;
|
||||
candidate.prewhere_info->row_level_filter
|
||||
= std::make_shared<ExpressionActions>(row_level_filter_actions, actions_settings);
|
||||
candidate.prewhere_info->row_level_filter = row_level_filter_actions;
|
||||
}
|
||||
|
||||
if (prewhere_info->alias_actions)
|
||||
if (candidate.prewhere_info->alias_actions)
|
||||
{
|
||||
auto alias_actions = prewhere_info->alias_actions->clone();
|
||||
auto alias_actions = candidate.prewhere_info->alias_actions->clone();
|
||||
// std::cerr << fmt::format("alias_actions = \n{}", alias_actions->dumpDAG()) << std::endl;
|
||||
prewhere_required_columns
|
||||
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false);
|
||||
@ -3990,7 +3983,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
||||
// std::cerr << fmt::format("alias_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl;
|
||||
if (prewhere_required_columns.empty())
|
||||
return false;
|
||||
candidate.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(alias_actions, actions_settings);
|
||||
candidate.prewhere_info->alias_actions = alias_actions;
|
||||
}
|
||||
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
|
||||
}
|
||||
|
@ -520,7 +520,7 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn:
|
||||
MergeTreeRangeReader::MergeTreeRangeReader(
|
||||
IMergeTreeReader * merge_tree_reader_,
|
||||
MergeTreeRangeReader * prev_reader_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
const PrewhereExprInfo * prewhere_info_,
|
||||
bool last_reader_in_chain_)
|
||||
: merge_tree_reader(merge_tree_reader_)
|
||||
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
|
||||
|
@ -15,6 +15,25 @@ class MergeTreeIndexGranularity;
|
||||
struct PrewhereInfo;
|
||||
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
|
||||
|
||||
class ExpressionActions;
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
|
||||
/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG
|
||||
struct PrewhereExprInfo
|
||||
{
|
||||
/// Actions which are executed in order to alias columns are used for prewhere actions.
|
||||
ExpressionActionsPtr alias_actions;
|
||||
/// Actions for row level security filter. Applied separately before prewhere_actions.
|
||||
/// This actions are separate because prewhere condition should not be executed over filtered rows.
|
||||
ExpressionActionsPtr row_level_filter;
|
||||
/// Actions which are executed on block in order to get filter column for prewhere step.
|
||||
ExpressionActionsPtr prewhere_actions;
|
||||
String row_level_column_name;
|
||||
String prewhere_column_name;
|
||||
bool remove_prewhere_column = false;
|
||||
bool need_filter = false;
|
||||
};
|
||||
|
||||
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
|
||||
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
|
||||
/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
|
||||
@ -24,7 +43,7 @@ public:
|
||||
MergeTreeRangeReader(
|
||||
IMergeTreeReader * merge_tree_reader_,
|
||||
MergeTreeRangeReader * prev_reader_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
const PrewhereExprInfo * prewhere_info_,
|
||||
bool last_reader_in_chain_);
|
||||
|
||||
MergeTreeRangeReader() = default;
|
||||
@ -217,7 +236,7 @@ private:
|
||||
IMergeTreeReader * merge_tree_reader = nullptr;
|
||||
const MergeTreeIndexGranularity * index_granularity = nullptr;
|
||||
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
|
||||
PrewhereInfoPtr prewhere_info;
|
||||
const PrewhereExprInfo * prewhere_info;
|
||||
|
||||
Stream stream;
|
||||
|
||||
|
@ -23,6 +23,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
|
||||
MarkRanges mark_ranges_,
|
||||
bool use_uncompressed_cache_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
ExpressionActionsSettings actions_settings,
|
||||
bool check_columns,
|
||||
const MergeTreeReaderSettings & reader_settings_,
|
||||
const Names & virt_column_names_,
|
||||
@ -31,7 +32,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
|
||||
:
|
||||
MergeTreeBaseSelectProcessor{
|
||||
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
|
||||
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
|
||||
storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
|
||||
reader_settings_, use_uncompressed_cache_, virt_column_names_},
|
||||
required_columns{std::move(required_columns_)},
|
||||
|
@ -27,6 +27,7 @@ public:
|
||||
MarkRanges mark_ranges,
|
||||
bool use_uncompressed_cache,
|
||||
const PrewhereInfoPtr & prewhere_info,
|
||||
ExpressionActionsSettings actions_settings,
|
||||
bool check_columns,
|
||||
const MergeTreeReaderSettings & reader_settings,
|
||||
const Names & virt_column_names = {},
|
||||
|
@ -23,6 +23,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
|
||||
MarkRanges mark_ranges_,
|
||||
bool use_uncompressed_cache_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
ExpressionActionsSettings actions_settings,
|
||||
bool check_columns_,
|
||||
const MergeTreeReaderSettings & reader_settings_,
|
||||
const Names & virt_column_names_,
|
||||
@ -31,7 +32,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
|
||||
:
|
||||
MergeTreeBaseSelectProcessor{
|
||||
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
|
||||
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
|
||||
storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
|
||||
reader_settings_, use_uncompressed_cache_, virt_column_names_},
|
||||
required_columns{std::move(required_columns_)},
|
||||
|
@ -27,6 +27,7 @@ public:
|
||||
MarkRanges mark_ranges,
|
||||
bool use_uncompressed_cache,
|
||||
const PrewhereInfoPtr & prewhere_info,
|
||||
ExpressionActionsSettings actions_settings,
|
||||
bool check_columns,
|
||||
const MergeTreeReaderSettings & reader_settings,
|
||||
const Names & virt_column_names = {},
|
||||
|
@ -19,11 +19,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const bool use_uncompressed_cache_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
ExpressionActionsSettings actions_settings,
|
||||
const MergeTreeReaderSettings & reader_settings_,
|
||||
const Names & virt_column_names_)
|
||||
:
|
||||
MergeTreeBaseSelectProcessor{
|
||||
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
|
||||
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
|
||||
reader_settings_, use_uncompressed_cache_, virt_column_names_},
|
||||
thread{thread_},
|
||||
|
@ -25,7 +25,9 @@ public:
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const bool use_uncompressed_cache_,
|
||||
const PrewhereInfoPtr & prewhere_info_,
|
||||
ExpressionActionsSettings actions_settings,
|
||||
const MergeTreeReaderSettings & reader_settings_,
|
||||
|
||||
const Names & virt_column_names_);
|
||||
|
||||
String getName() const override { return "MergeTreeThread"; }
|
||||
|
@ -21,9 +21,6 @@ using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
|
||||
struct PrewhereInfo;
|
||||
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
|
||||
|
||||
struct PrewhereDAGInfo;
|
||||
using PrewhereDAGInfoPtr = std::shared_ptr<PrewhereDAGInfo>;
|
||||
|
||||
struct FilterInfo;
|
||||
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
|
||||
|
||||
@ -45,34 +42,19 @@ using ClusterPtr = std::shared_ptr<Cluster>;
|
||||
struct PrewhereInfo
|
||||
{
|
||||
/// Actions which are executed in order to alias columns are used for prewhere actions.
|
||||
ExpressionActionsPtr alias_actions;
|
||||
ActionsDAGPtr alias_actions;
|
||||
/// Actions for row level security filter. Applied separately before prewhere_actions.
|
||||
/// This actions are separate because prewhere condition should not be executed over filtered rows.
|
||||
ExpressionActionsPtr row_level_filter;
|
||||
ActionsDAGPtr row_level_filter;
|
||||
/// Actions which are executed on block in order to get filter column for prewhere step.
|
||||
ExpressionActionsPtr prewhere_actions;
|
||||
/// Actions which are executed after reading from storage in order to remove unused columns.
|
||||
ExpressionActionsPtr remove_columns_actions;
|
||||
String row_level_column_name;
|
||||
String prewhere_column_name;
|
||||
bool remove_prewhere_column = false;
|
||||
bool need_filter = false;
|
||||
};
|
||||
|
||||
/// Same as PrewhereInfo, but with ActionsDAG.
|
||||
struct PrewhereDAGInfo
|
||||
{
|
||||
ActionsDAGPtr alias_actions;
|
||||
ActionsDAGPtr row_level_filter_actions;
|
||||
ActionsDAGPtr prewhere_actions;
|
||||
ActionsDAGPtr remove_columns_actions;
|
||||
String row_level_column_name;
|
||||
String prewhere_column_name;
|
||||
bool remove_prewhere_column = false;
|
||||
bool need_filter = false;
|
||||
|
||||
PrewhereDAGInfo() = default;
|
||||
explicit PrewhereDAGInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_)
|
||||
PrewhereInfo() = default;
|
||||
explicit PrewhereInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_)
|
||||
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
|
||||
|
||||
std::string dump() const;
|
||||
|
@ -369,13 +369,14 @@ void StorageBuffer::read(
|
||||
{
|
||||
if (query_info.prewhere_info)
|
||||
{
|
||||
auto actions_settings = ExpressionActionsSettings::fromContext(local_context);
|
||||
if (query_info.prewhere_info->alias_actions)
|
||||
{
|
||||
pipe_from_buffers.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(
|
||||
header,
|
||||
query_info.prewhere_info->alias_actions);
|
||||
std::make_shared<ExpressionActions>(query_info.prewhere_info->alias_actions, actions_settings));
|
||||
});
|
||||
}
|
||||
|
||||
@ -385,7 +386,7 @@ void StorageBuffer::read(
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
header,
|
||||
query_info.prewhere_info->row_level_filter,
|
||||
std::make_shared<ExpressionActions>(query_info.prewhere_info->row_level_filter, actions_settings),
|
||||
query_info.prewhere_info->row_level_column_name,
|
||||
false);
|
||||
});
|
||||
@ -395,7 +396,7 @@ void StorageBuffer::read(
|
||||
{
|
||||
return std::make_shared<FilterTransform>(
|
||||
header,
|
||||
query_info.prewhere_info->prewhere_actions,
|
||||
std::make_shared<ExpressionActions>(query_info.prewhere_info->prewhere_actions, actions_settings),
|
||||
query_info.prewhere_info->prewhere_column_name,
|
||||
query_info.prewhere_info->remove_prewhere_column);
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user