Remove PrewhereDAGInfo.

This commit is contained in:
Nikolai Kochetov 2021-06-25 17:49:28 +03:00
parent 1f07dd06fc
commit 6bc0a628cd
21 changed files with 108 additions and 134 deletions

View File

@ -1514,7 +1514,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (auto actions = query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
{
prewhere_info = std::make_shared<PrewhereDAGInfo>(actions, query.prewhere()->getColumnName());
prewhere_info = std::make_shared<PrewhereInfo>(actions, query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
{
@ -1734,7 +1734,6 @@ void ExpressionAnalysisResult::checkActions() const
check_actions(prewhere_info->prewhere_actions);
check_actions(prewhere_info->alias_actions);
check_actions(prewhere_info->remove_columns_actions);
}
}

View File

@ -239,7 +239,7 @@ struct ExpressionAnalysisResult
/// Columns will be removed after prewhere actions execution.
NameSet columns_to_remove_after_prewhere;
PrewhereDAGInfoPtr prewhere_info;
PrewhereInfoPtr prewhere_info;
FilterDAGInfoPtr filter_info;
ConstantFilterDescription prewhere_constant_filter_description;
ConstantFilterDescription where_constant_filter_description;

View File

@ -958,11 +958,11 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.prewhere_info)
{
if (expressions.prewhere_info->row_level_filter_actions)
if (expressions.prewhere_info->row_level_filter)
{
auto row_level_filter_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.prewhere_info->row_level_filter_actions,
expressions.prewhere_info->row_level_filter,
expressions.prewhere_info->row_level_column_name,
false);
@ -978,18 +978,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
prewhere_step->setStepDescription("PREWHERE");
query_plan.addStep(std::move(prewhere_step));
// To remove additional columns in dry run
// For example, sample column which can be removed in this stage
// TODO There seems to be no place initializing remove_columns_actions
if (expressions.prewhere_info->remove_columns_actions)
{
auto remove_columns = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(), expressions.prewhere_info->remove_columns_actions);
remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE");
query_plan.addStep(std::move(remove_columns));
}
}
}
else
@ -1479,33 +1467,29 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
if (prewhere_info.alias_actions)
{
pipe.addSimpleTransform(
[&](const Block & header) { return std::make_shared<ExpressionTransform>(header, prewhere_info.alias_actions); });
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header,
std::make_shared<ExpressionActions>(prewhere_info.alias_actions));
});
}
if (prewhere_info.row_level_filter)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(header, prewhere_info.row_level_filter, prewhere_info.row_level_column_name, true);
return std::make_shared<FilterTransform>(header,
std::make_shared<ExpressionActions>(prewhere_info.row_level_filter),
prewhere_info.row_level_column_name, true);
});
}
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header, prewhere_info.prewhere_actions, prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column);
header, std::make_shared<ExpressionActions>(prewhere_info.prewhere_actions),
prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column);
});
// To remove additional columns
// In some cases, we did not read any marks so that the pipeline.streams is empty
// Thus, some columns in prewhere are not removed as expected
// This leads to mismatched header in distributed table
if (prewhere_info.remove_columns_actions)
{
pipe.addSimpleTransform(
[&](const Block & header) { return std::make_shared<ExpressionTransform>(header, prewhere_info.remove_columns_actions); });
}
}
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
@ -1560,7 +1544,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
if (does_storage_support_prewhere && settings.optimize_move_to_prewhere)
{
/// Execute row level filter in prewhere as a part of "move to prewhere" optimization.
expressions.prewhere_info = std::make_shared<PrewhereDAGInfo>(
expressions.prewhere_info = std::make_shared<PrewhereInfo>(
std::move(expressions.filter_info->actions),
std::move(expressions.filter_info->column_name));
expressions.prewhere_info->prewhere_actions->projectInput(false);
@ -1572,9 +1556,9 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
else
{
/// Add row level security actions to prewhere.
expressions.prewhere_info->row_level_filter_actions = std::move(expressions.filter_info->actions);
expressions.prewhere_info->row_level_filter = std::move(expressions.filter_info->actions);
expressions.prewhere_info->row_level_column_name = std::move(expressions.filter_info->column_name);
expressions.prewhere_info->row_level_filter_actions->projectInput(false);
expressions.prewhere_info->row_level_filter->projectInput(false);
expressions.filter_info = nullptr;
}
}
@ -1613,9 +1597,9 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
if (prewhere_info->row_level_filter_actions)
if (prewhere_info->row_level_filter)
{
auto row_level_required_columns = prewhere_info->row_level_filter_actions->getRequiredColumns().getNames();
auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(row_level_required_columns.begin(), row_level_required_columns.end());
}
}
@ -1898,28 +1882,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
auto & prewhere_info = analysis_result.prewhere_info;
if (prewhere_info)
{
auto actions_settings = ExpressionActionsSettings::fromContext(context, CompileExpressions::yes);
query_info.prewhere_info = std::make_shared<PrewhereInfo>();
query_info.prewhere_info->prewhere_actions
= std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
if (prewhere_info->row_level_filter_actions)
query_info.prewhere_info->row_level_filter
= std::make_shared<ExpressionActions>(prewhere_info->row_level_filter_actions, actions_settings);
if (prewhere_info->alias_actions)
query_info.prewhere_info->alias_actions
= std::make_shared<ExpressionActions>(prewhere_info->alias_actions, actions_settings);
if (prewhere_info->remove_columns_actions)
query_info.prewhere_info->remove_columns_actions
= std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions, actions_settings);
query_info.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
query_info.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
query_info.prewhere_info->need_filter = prewhere_info->need_filter;
}
query_info.prewhere_info = prewhere_info;
/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.

View File

@ -98,12 +98,12 @@ Block getHeaderForProcessingStage(
if (prewhere_info.row_level_filter)
{
prewhere_info.row_level_filter->execute(header);
header = prewhere_info.row_level_filter->updateHeader(std::move(header));
header.erase(prewhere_info.row_level_column_name);
}
if (prewhere_info.prewhere_actions)
prewhere_info.prewhere_actions->execute(header);
header = prewhere_info.prewhere_actions->updateHeader(std::move(header));
if (prewhere_info.remove_prewhere_column)
header.erase(prewhere_info.prewhere_column_name);

View File

@ -94,6 +94,7 @@ ReadFromMergeTree::ReadFromMergeTree(
, data(data_)
, query_info(query_info_)
, prewhere_info(getPrewhereInfo(query_info))
, actions_settings(ExpressionActionsSettings::fromContext(context_))
, metadata_snapshot(std::move(metadata_snapshot_))
, metadata_snapshot_base(std::move(metadata_snapshot_base_))
, context(std::move(context_))
@ -157,7 +158,7 @@ Pipe ReadFromMergeTree::readFromPool(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
prewhere_info, reader_settings, virt_column_names);
prewhere_info, actions_settings, reader_settings, virt_column_names);
if (i == 0)
{
@ -180,7 +181,7 @@ ProcessorPtr ReadFromMergeTree::createSource(
return std::make_shared<TSource>(
data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
prewhere_info, true, reader_settings, virt_column_names, part.part_index_in_query);
prewhere_info, actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query);
}
Pipe ReadFromMergeTree::readInOrder(

View File

@ -90,6 +90,7 @@ private:
const MergeTreeData & data;
SelectQueryInfo query_info;
PrewhereInfoPtr prewhere_info;
ExpressionActionsSettings actions_settings;
StorageMetadataPtr metadata_snapshot;
StorageMetadataPtr metadata_snapshot_base;

View File

@ -198,7 +198,7 @@ NameDependencies IStorage::getDependentViewsByColumn(ContextPtr context) const
return name_deps;
}
std::string PrewhereDAGInfo::dump() const
std::string PrewhereInfo::dump() const
{
WriteBufferFromOwnString ss;
ss << "PrewhereDagInfo\n";
@ -213,11 +213,6 @@ std::string PrewhereDAGInfo::dump() const
ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n";
}
if (remove_columns_actions)
{
ss << "remove_columns_actions " << remove_columns_actions->dumpDAG() << "\n";
}
ss << "remove_prewhere_column " << remove_prewhere_column
<< ", need_filter " << need_filter << "\n";

View File

@ -26,6 +26,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
@ -49,6 +50,23 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
if (header_without_virtual_columns.has(*it))
header_without_virtual_columns.erase(*it);
if (prewhere_info)
{
prewhere_actions = std::make_unique<PrewhereActions>();
if (prewhere_info->alias_actions)
prewhere_actions->alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions, actions_settings);
if (prewhere_info->row_level_filter)
prewhere_actions->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings);
prewhere_actions->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
prewhere_actions->row_level_column_name = prewhere_info->row_level_column_name;
prewhere_actions->prewhere_column_name = prewhere_info->prewhere_column_name;
prewhere_actions->remove_prewhere_column = prewhere_info->remove_prewhere_column;
prewhere_actions->need_filter = prewhere_info->need_filter;
}
}
@ -78,14 +96,14 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
{
if (reader->getColumns().empty())
{
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true);
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false);
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false);
pre_reader_ptr = &current_task.pre_range_reader;
}
@ -396,16 +414,17 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns(
chunk.setColumns(columns, num_rows);
}
void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
Block MergeTreeBaseSelectProcessor::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
if (prewhere_info)
{
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->execute(block);
block = prewhere_info->alias_actions->updateHeader(std::move(block));
if (prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter->execute(block);
block = prewhere_info->row_level_filter->updateHeader(std::move(block));
auto & row_level_column = block.getByName(prewhere_info->row_level_column_name);
if (!row_level_column.type->canBeUsedInBooleanContext())
{
@ -417,7 +436,7 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
}
if (prewhere_info->prewhere_actions)
prewhere_info->prewhere_actions->execute(block);
block = prewhere_info->prewhere_actions->updateHeader(std::move(block));
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
if (!prewhere_column.type->canBeUsedInBooleanContext())
@ -434,12 +453,7 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
}
}
}
Block MergeTreeBaseSelectProcessor::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns)
{
executePrewhereActions(block, prewhere_info);
injectVirtualColumns(block, nullptr, partition_value_type, virtual_columns);
return block;
}

View File

@ -13,7 +13,7 @@ namespace DB
class IMergeTreeReader;
class UncompressedCache;
class MarkCache;
struct PrewhereActions;
/// Base class for MergeTreeThreadSelectProcessor and MergeTreeSelectProcessor
class MergeTreeBaseSelectProcessor : public SourceWithProgress
@ -24,6 +24,7 @@ public:
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
@ -36,8 +37,6 @@ public:
static Block transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
protected:
Chunk generate() final;
@ -61,6 +60,7 @@ protected:
StorageMetadataPtr metadata_snapshot;
PrewhereInfoPtr prewhere_info;
std::unique_ptr<PrewhereActions> prewhere_actions;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;

View File

@ -272,16 +272,16 @@ MergeTreeReadTaskColumns getReadTaskColumns(
if (prewhere_info)
{
if (prewhere_info->alias_actions)
pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
pre_column_names = prewhere_info->alias_actions->getRequiredColumnsNames();
else
{
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
if (prewhere_info->row_level_filter)
{
NameSet names(pre_column_names.begin(), pre_column_names.end());
for (auto & name : prewhere_info->row_level_filter->getRequiredColumns())
for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames())
{
if (names.count(name) == 0)
pre_column_names.push_back(name);

View File

@ -3940,15 +3940,9 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
if (analysis_result.prewhere_info)
{
const auto & prewhere_info = analysis_result.prewhere_info;
candidate.prewhere_info = std::make_shared<PrewhereInfo>();
candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
// std::cerr << fmt::format("remove prewhere column : {}", candidate.prewhere_info->remove_prewhere_column) << std::endl;
candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
candidate.prewhere_info->need_filter = prewhere_info->need_filter;
candidate.prewhere_info = analysis_result.prewhere_info;
auto prewhere_actions = prewhere_info->prewhere_actions->clone();
auto prewhere_actions = candidate.prewhere_info->prewhere_actions->clone();
auto prewhere_required_columns = required_columns;
// required_columns should not contain columns generated by prewhere
for (const auto & column : prewhere_actions->getResultColumns())
@ -3956,28 +3950,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
// std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl;
// Prewhere_action should not add missing keys.
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name, false);
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false);
// std::cerr << fmt::format("prewhere_actions = \n{}", prewhere_actions->dumpDAG()) << std::endl;
// std::cerr << fmt::format("prewhere_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl;
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_actions, actions_settings);
candidate.prewhere_info->prewhere_actions = prewhere_actions;
if (prewhere_info->row_level_filter_actions)
if (candidate.prewhere_info->row_level_filter)
{
auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone();
auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone();
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name, false);
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false);
// std::cerr << fmt::format("row_level_filter_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl;
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->row_level_filter
= std::make_shared<ExpressionActions>(row_level_filter_actions, actions_settings);
candidate.prewhere_info->row_level_filter = row_level_filter_actions;
}
if (prewhere_info->alias_actions)
if (candidate.prewhere_info->alias_actions)
{
auto alias_actions = prewhere_info->alias_actions->clone();
auto alias_actions = candidate.prewhere_info->alias_actions->clone();
// std::cerr << fmt::format("alias_actions = \n{}", alias_actions->dumpDAG()) << std::endl;
prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false);
@ -3985,7 +3978,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
// std::cerr << fmt::format("alias_required_columns = \n{}", fmt::join(prewhere_required_columns, ", ")) << std::endl;
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(alias_actions, actions_settings);
candidate.prewhere_info->alias_actions = alias_actions;
}
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
}

View File

@ -520,7 +520,7 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn:
MergeTreeRangeReader::MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereActions * prewhere_info_,
bool last_reader_in_chain_)
: merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))

View File

@ -15,6 +15,24 @@ class MergeTreeIndexGranularity;
struct PrewhereInfo;
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
struct PrewhereActions
{
/// Actions which are executed in order to alias columns are used for prewhere actions.
ExpressionActionsPtr alias_actions;
/// Actions for row level security filter. Applied separately before prewhere_actions.
/// This actions are separate because prewhere condition should not be executed over filtered rows.
ExpressionActionsPtr row_level_filter;
/// Actions which are executed on block in order to get filter column for prewhere step.
ExpressionActionsPtr prewhere_actions;
String row_level_column_name;
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
};
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
@ -24,7 +42,7 @@ public:
MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereInfoPtr & prewhere_info_,
const PrewhereActions * prewhere_info_,
bool last_reader_in_chain_);
MergeTreeRangeReader() = default;
@ -217,7 +235,7 @@ private:
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
PrewhereInfoPtr prewhere_info;
const PrewhereActions * prewhere_info;
Stream stream;

View File

@ -23,6 +23,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
bool check_columns,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -31,7 +32,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},

View File

@ -27,6 +27,7 @@ public:
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
ExpressionActionsSettings actions_settings,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -23,6 +23,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
bool check_columns_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -31,7 +32,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},

View File

@ -27,6 +27,7 @@ public:
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
ExpressionActionsSettings actions_settings,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -19,11 +19,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_)
:
MergeTreeBaseSelectProcessor{
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
thread{thread_},

View File

@ -25,7 +25,9 @@ public:
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_);
String getName() const override { return "MergeTreeThread"; }

View File

@ -21,9 +21,6 @@ using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
struct PrewhereInfo;
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
struct PrewhereDAGInfo;
using PrewhereDAGInfoPtr = std::shared_ptr<PrewhereDAGInfo>;
struct FilterInfo;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
@ -45,34 +42,19 @@ using ClusterPtr = std::shared_ptr<Cluster>;
struct PrewhereInfo
{
/// Actions which are executed in order to alias columns are used for prewhere actions.
ExpressionActionsPtr alias_actions;
ActionsDAGPtr alias_actions;
/// Actions for row level security filter. Applied separately before prewhere_actions.
/// This actions are separate because prewhere condition should not be executed over filtered rows.
ExpressionActionsPtr row_level_filter;
ActionsDAGPtr row_level_filter;
/// Actions which are executed on block in order to get filter column for prewhere step.
ExpressionActionsPtr prewhere_actions;
/// Actions which are executed after reading from storage in order to remove unused columns.
ExpressionActionsPtr remove_columns_actions;
String row_level_column_name;
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
};
/// Same as PrewhereInfo, but with ActionsDAG.
struct PrewhereDAGInfo
{
ActionsDAGPtr alias_actions;
ActionsDAGPtr row_level_filter_actions;
ActionsDAGPtr prewhere_actions;
ActionsDAGPtr remove_columns_actions;
String row_level_column_name;
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
PrewhereDAGInfo() = default;
explicit PrewhereDAGInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_)
PrewhereInfo() = default;
explicit PrewhereInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_)
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
std::string dump() const;

View File

@ -369,13 +369,14 @@ void StorageBuffer::read(
{
if (query_info.prewhere_info)
{
auto actions_settings = ExpressionActionsSettings::fromContext(local_context);
if (query_info.prewhere_info->alias_actions)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header,
query_info.prewhere_info->alias_actions);
std::make_shared<ExpressionActions>(query_info.prewhere_info->alias_actions, actions_settings));
});
}
@ -385,7 +386,7 @@ void StorageBuffer::read(
{
return std::make_shared<FilterTransform>(
header,
query_info.prewhere_info->row_level_filter,
std::make_shared<ExpressionActions>(query_info.prewhere_info->row_level_filter, actions_settings),
query_info.prewhere_info->row_level_column_name,
false);
});
@ -395,7 +396,7 @@ void StorageBuffer::read(
{
return std::make_shared<FilterTransform>(
header,
query_info.prewhere_info->prewhere_actions,
std::make_shared<ExpressionActions>(query_info.prewhere_info->prewhere_actions, actions_settings),
query_info.prewhere_info->prewhere_column_name,
query_info.prewhere_info->remove_prewhere_column);
});