Store filter info in prewhere info instead of multiple prewheres

Some cleanups
This commit is contained in:
Denis Glazachev 2021-02-14 02:07:13 +04:00
parent 6aa86846ac
commit 79592b73f8
23 changed files with 320 additions and 232 deletions

View File

@ -1320,7 +1320,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
bool first_stage_,
bool second_stage_,
bool only_types,
const FilterInfoPtr & filter_info_,
const FilterDAGInfoPtr & filter_info_,
const Block & source_header)
: first_stage(first_stage_)
, second_stage(second_stage_)
@ -1383,7 +1383,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (storage && filter_info_)
{
filter_info = filter_info_;
query_analyzer.appendPreliminaryFilter(chain, filter_info->actions_dag, filter_info->column_name);
query_analyzer.appendPreliminaryFilter(chain, filter_info->actions, filter_info->column_name);
}
if (auto actions = query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
@ -1583,7 +1583,7 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, si
void ExpressionAnalysisResult::removeExtraColumns() const
{
if (hasFilter())
filter_info->actions_dag->projectInput();
filter_info->actions->projectInput();
if (hasWhere())
before_where->projectInput();
if (hasHaving())

View File

@ -215,7 +215,7 @@ struct ExpressionAnalysisResult
NameSet columns_to_remove_after_prewhere;
PrewhereDAGInfoPtr prewhere_info;
FilterInfoPtr filter_info;
FilterDAGInfoPtr filter_info;
ConstantFilterDescription prewhere_constant_filter_description;
ConstantFilterDescription where_constant_filter_description;
/// Actions by every element of ORDER BY
@ -230,7 +230,7 @@ struct ExpressionAnalysisResult
bool first_stage,
bool second_stage,
bool only_types,
const FilterInfoPtr & filter_info,
const FilterDAGInfoPtr & filter_info,
const Block & source_header);
/// Filter for row-level security.

View File

@ -447,10 +447,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Fix source_header for filter actions.
if (row_policy_filter)
{
filter_info = std::make_shared<FilterInfo>();
filter_info->column_name = generateFilterActions(filter_info->actions_dag, row_policy_filter, required_columns);
filter_info = std::make_shared<FilterDAGInfo>();
filter_info->column_name = generateFilterActions(filter_info->actions, row_policy_filter, required_columns);
source_header = metadata_snapshot->getSampleBlockForColumns(
filter_info->actions_dag->getRequiredColumns().getNames(), storage->getVirtuals(), storage->getStorageID());
filter_info->actions->getRequiredColumns().getNames(), storage->getVirtuals(), storage->getStorageID());
}
}
@ -807,9 +807,12 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
bool intermediate_stage = false;
bool to_aggregation_stage = false;
bool from_aggregation_stage = false;
const bool filter_in_prewhere = (
(settings.optimize_move_to_prewhere || expressions.prewhere_info) &&
!input && !input_pipe && storage && storage->supportsPrewhere()
const bool execute_row_level_filter_in_prewhere = (
(
settings.optimize_move_to_prewhere || // ...when it is allowed to move things to prewhere, so we do it for row-level filter actions too.
expressions.prewhere_info // ...or when we already have prewhere and must execute row-level filter before it.
) &&
!input && !input_pipe && storage && storage->supportsPrewhere() // Check that prewhere can be used at all.
);
if (options.only_analyze)
@ -817,11 +820,11 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
query_plan.addStep(std::move(read_nothing));
if (expressions.filter_info && filter_in_prewhere)
if (expressions.filter_info && execute_row_level_filter_in_prewhere)
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.filter_info->actions_dag,
expressions.filter_info->actions,
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
@ -880,7 +883,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
to_aggregation_stage = true;
/// Read the data from Storage. from_stage - to what stage the request was completed in Storage.
executeFetchColumns(from_stage, query_plan, filter_in_prewhere);
executeFetchColumns(from_stage, query_plan, execute_row_level_filter_in_prewhere);
LOG_TRACE(log, "{} -> {}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage));
}
@ -945,11 +948,11 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.first_stage)
{
if (expressions.filter_info && !filter_in_prewhere)
if (expressions.filter_info && !execute_row_level_filter_in_prewhere)
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.filter_info->actions_dag,
expressions.filter_info->actions,
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
@ -1200,40 +1203,55 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
{
Pipe pipe(std::make_shared<NullSource>(source_header));
if (query_info.prewhere_info_list)
if (query_info.prewhere_info)
{
for (const auto & prewhere_info : *query_info.prewhere_info_list)
auto & prewhere_info = *query_info.prewhere_info;
if (prewhere_info.filter_info)
{
if (prewhere_info.alias_actions)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header, prewhere_info.alias_actions);
});
}
auto & filter_info = *prewhere_info.filter_info;
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
prewhere_info.prewhere_actions,
prewhere_info.prewhere_column_name,
prewhere_info.remove_prewhere_column);
filter_info.actions,
filter_info.column_name,
filter_info.do_remove_column);
});
}
// To remove additional columns
// In some cases, we did not read any marks so that the pipeline.streams is empty
// Thus, some columns in prewhere are not removed as expected
// This leads to mismatched header in distributed table
if (prewhere_info.remove_columns_actions)
if (prewhere_info.alias_actions)
{
pipe.addSimpleTransform([&](const Block & header)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header, prewhere_info.remove_columns_actions);
});
}
return std::make_shared<ExpressionTransform>(
header,
prewhere_info.alias_actions);
});
}
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
prewhere_info.prewhere_actions,
prewhere_info.prewhere_column_name,
prewhere_info.remove_prewhere_column);
});
// To remove additional columns
// In some cases, we did not read any marks so that the pipeline.streams is empty
// Thus, some columns in prewhere are not removed as expected
// This leads to mismatched header in distributed table
if (prewhere_info.remove_columns_actions)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header,
prewhere_info.remove_columns_actions);
});
}
}
@ -1242,7 +1260,7 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
query_plan.addStep(std::move(read_from_pipe));
}
void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan, bool filter_in_prewhere)
void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan, bool execute_row_level_filter_in_prewhere)
{
auto & query = getSelectQuery();
const Settings & settings = context->getSettingsRef();
@ -1569,47 +1587,33 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.sets = query_analyzer->getPreparedSets();
if (expressions.filter_info && filter_in_prewhere)
{
if (!query_info.prewhere_info_list)
query_info.prewhere_info_list = std::make_shared<PrewhereInfoList>();
query_info.prewhere_info_list->emplace(
query_info.prewhere_info_list->begin(),
std::make_shared<ExpressionActions>(expressions.filter_info->actions_dag),
expressions.filter_info->column_name);
if (alias_actions)
{
query_info.prewhere_info_list->back().alias_actions = std::make_shared<ExpressionActions>(alias_actions);
alias_actions = nullptr;
}
auto & new_filter_info = query_info.prewhere_info_list->front();
new_filter_info.remove_prewhere_column = expressions.filter_info->do_remove_column;
new_filter_info.need_filter = true;
}
if (prewhere_info)
{
if (!query_info.prewhere_info_list)
query_info.prewhere_info_list = std::make_shared<PrewhereInfoList>();
query_info.prewhere_info = std::make_shared<PrewhereInfo>();
query_info.prewhere_info_list->emplace_back(
std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions),
prewhere_info->prewhere_column_name);
if (expressions.filter_info && execute_row_level_filter_in_prewhere)
{
query_info.prewhere_info->filter_info = std::make_shared<FilterInfo>();
auto & new_prewhere_info = query_info.prewhere_info_list->back();
if (expressions.filter_info->actions)
query_info.prewhere_info->filter_info->actions = std::make_shared<ExpressionActions>(expressions.filter_info->actions);
query_info.prewhere_info->filter_info->column_name = expressions.filter_info->column_name;
query_info.prewhere_info->filter_info->do_remove_column = expressions.filter_info->do_remove_column;
}
if (prewhere_info->alias_actions)
new_prewhere_info.alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions);
query_info.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions);
if (prewhere_info->prewhere_actions)
query_info.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
if (prewhere_info->remove_columns_actions)
new_prewhere_info.remove_columns_actions = std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions);
query_info.prewhere_info->remove_columns_actions = std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions);
new_prewhere_info.remove_prewhere_column = prewhere_info->remove_prewhere_column;
new_prewhere_info.need_filter = prewhere_info->need_filter;
query_info.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
query_info.prewhere_info->need_filter = prewhere_info->need_filter;
}
/// Create optimizer with prepared actions.

View File

@ -108,7 +108,7 @@ private:
/// Different stages of query execution.
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan, bool filter_in_prewhere);
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan, bool execute_row_level_filter_in_prewhere);
void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
void executeAggregation(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final);
@ -157,7 +157,7 @@ private:
/// Is calculated in getSampleBlock. Is used later in readImpl.
ExpressionAnalysisResult analysis_result;
/// For row-level security.
FilterInfoPtr filter_info;
FilterDAGInfoPtr filter_info;
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;

View File

@ -42,14 +42,26 @@ Block getHeaderForProcessingStage(
case QueryProcessingStage::FetchColumns:
{
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
if (query_info.prewhere_info_list)
if (query_info.prewhere_info)
{
for (const auto & prewhere_info : *query_info.prewhere_info_list)
auto & prewhere_info = *query_info.prewhere_info;
if (prewhere_info.filter_info)
{
prewhere_info.prewhere_actions->execute(header);
if (prewhere_info.remove_prewhere_column)
header.erase(prewhere_info.prewhere_column_name);
auto & filter_info = *prewhere_info.filter_info;
if (filter_info.actions)
filter_info.actions->execute(header);
if (filter_info.do_remove_column)
header.erase(filter_info.column_name);
}
if (prewhere_info.prewhere_actions)
prewhere_info.prewhere_actions->execute(header);
if (prewhere_info.remove_prewhere_column)
header.erase(prewhere_info.prewhere_column_name);
}
return header;
}

View File

@ -203,14 +203,14 @@ std::string PrewhereDAGInfo::dump() const
return ss.str();
}
std::string FilterInfo::dump() const
std::string FilterDAGInfo::dump() const
{
WriteBufferFromOwnString ss;
ss << "FilterInfo for column '" << column_name <<"', do_remove_column "
ss << "FilterDAGInfo for column '" << column_name <<"', do_remove_column "
<< do_remove_column << "\n";
if (actions_dag)
if (actions)
{
ss << "actions_dag " << actions_dag->dumpDAG() << "\n";
ss << "actions " << actions->dumpDAG() << "\n";
}
return ss.str();

View File

@ -22,17 +22,17 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
Block header,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoListPtr & prewhere_info_list_,
const PrewhereInfoPtr & prewhere_info_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
bool use_uncompressed_cache_,
const Names & virt_column_names_)
: SourceWithProgress(getHeader(std::move(header), prewhere_info_list_, virt_column_names_))
: SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, prewhere_info_list(prewhere_info_list_)
, prewhere_info(prewhere_info_)
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
@ -70,18 +70,18 @@ Chunk MergeTreeBaseSelectProcessor::generate()
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
{
if (prewhere_info_list)
if (prewhere_info)
{
if (reader->getColumns().empty())
{
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info_list, true);
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info_list, false);
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false);
pre_reader_ptr = &current_task.pre_range_reader;
}
@ -309,37 +309,60 @@ void MergeTreeBaseSelectProcessor::injectVirtualColumns(Chunk & chunk, MergeTree
chunk.setColumns(columns, num_rows);
}
void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoListPtr & prewhere_info_list)
void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
{
if (!prewhere_info_list)
return;
for (const auto & prewhere_info : *prewhere_info_list)
if (prewhere_info)
{
if (prewhere_info.alias_actions)
prewhere_info.alias_actions->execute(block);
if (prewhere_info->filter_info)
{
auto & filter_info = *prewhere_info->filter_info;
prewhere_info.prewhere_actions->execute(block);
auto & prewhere_column = block.getByName(prewhere_info.prewhere_column_name);
if (filter_info.actions)
filter_info.actions->execute(block);
auto & filter_column = block.getByName(filter_info.column_name);
if (!filter_column.type->canBeUsedInBooleanContext())
{
throw Exception("Invalid type for row-level security filter: " + filter_column.type->getName(),
ErrorCodes::LOGICAL_ERROR);
}
if (filter_info.do_remove_column)
block.erase(filter_info.column_name);
else
{
auto & ctn = block.getByName(filter_info.column_name);
ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
}
}
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->execute(block);
if (prewhere_info->prewhere_actions)
prewhere_info->prewhere_actions->execute(block);
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
if (!prewhere_column.type->canBeUsedInBooleanContext())
{
throw Exception("Invalid type for filter in PREWHERE: " + prewhere_column.type->getName(),
ErrorCodes::LOGICAL_ERROR);
}
if (prewhere_info.remove_prewhere_column)
block.erase(prewhere_info.prewhere_column_name);
if (prewhere_info->remove_prewhere_column)
block.erase(prewhere_info->prewhere_column_name);
else
{
auto & ctn = block.getByName(prewhere_info.prewhere_column_name);
auto & ctn = block.getByName(prewhere_info->prewhere_column_name);
ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
}
}
}
Block MergeTreeBaseSelectProcessor::getHeader(
Block block, const PrewhereInfoListPtr & prewhere_info_list, const Names & virtual_columns)
Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns)
{
executePrewhereActions(block, prewhere_info_list);
executePrewhereActions(block, prewhere_info);
injectVirtualColumns(block, nullptr, virtual_columns);
return block;
}

View File

@ -23,7 +23,7 @@ public:
Block header,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoListPtr & prewhere_info_list_,
const PrewhereInfoPtr & prewhere_info_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
@ -33,7 +33,7 @@ public:
~MergeTreeBaseSelectProcessor() override;
static void executePrewhereActions(Block & block, const PrewhereInfoListPtr & prewhere_info_list);
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
protected:
Chunk generate() final;
@ -49,7 +49,7 @@ protected:
static void injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns);
static void injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns);
static Block getHeader(Block block, const PrewhereInfoListPtr & prewhere_info_list, const Names & virtual_columns);
static Block getHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns);
void initializeRangeReaders(MergeTreeReadTask & task);
@ -57,7 +57,7 @@ protected:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
PrewhereInfoListPtr prewhere_info_list;
PrewhereInfoPtr prewhere_info;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;

View File

@ -118,10 +118,11 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const StorageMetada
MergeTreeReadTask::MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
const NamesAndTypesList & pre_columns_, const bool should_reorder_, MergeTreeBlockSizePredictorPtr && size_predictor_)
const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_,
MergeTreeBlockSizePredictorPtr && size_predictor_)
: data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_},
should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)}
remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)}
{
}
@ -257,7 +258,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const PrewhereInfoListPtr & prewhere_info_list,
const PrewhereInfoPtr & prewhere_info,
bool check_columns)
{
Names column_names = required_columns;
@ -266,12 +267,22 @@ MergeTreeReadTaskColumns getReadTaskColumns(
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, metadata_snapshot, data_part, column_names).empty();
if (prewhere_info_list)
if (prewhere_info)
{
for (const auto & prewhere_info : *prewhere_info_list)
if (prewhere_info->filter_info && prewhere_info->filter_info->actions)
{
const auto required_column_names = (prewhere_info.alias_actions ?
prewhere_info.alias_actions->getRequiredColumns() : prewhere_info.prewhere_actions->getRequiredColumns());
const auto required_column_names = prewhere_info->filter_info->actions->getRequiredColumns();
pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end());
}
if (prewhere_info->alias_actions)
{
const auto required_column_names = prewhere_info->alias_actions->getRequiredColumns();
pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end());
}
else
{
const auto required_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
pre_column_names.insert(pre_column_names.end(), required_column_names.begin(), required_column_names.end());
}

View File

@ -42,6 +42,8 @@ struct MergeTreeReadTask
const NamesAndTypesList & columns;
/// column names to read during PREWHERE
const NamesAndTypesList & pre_columns;
/// should PREWHERE column be returned to requesting side?
const bool remove_prewhere_column;
/// resulting block may require reordering in accordance with `ordered_names`
const bool should_reorder;
/// Used to satistfy preferred_block_size_bytes limitation
@ -55,7 +57,8 @@ struct MergeTreeReadTask
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
const NamesAndTypesList & pre_columns_, const bool should_reorder_, MergeTreeBlockSizePredictorPtr && size_predictor_);
const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_,
MergeTreeBlockSizePredictorPtr && size_predictor_);
virtual ~MergeTreeReadTask();
};
@ -75,7 +78,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const PrewhereInfoListPtr & prewhere_info_list,
const PrewhereInfoPtr & prewhere_info,
bool check_columns);
struct MergeTreeBlockSizePredictor

View File

@ -834,20 +834,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
plan->addStep(std::move(adding_column));
}
if (query_info.prewhere_info_list)
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
{
for (const auto & prewhere_info : *query_info.prewhere_info_list)
{
if (prewhere_info.remove_columns_actions)
{
auto expression_step = std::make_unique<ExpressionStep>(
plan->getCurrentDataStream(),
prewhere_info.remove_columns_actions->getActionsDAG().clone());
auto expression_step = std::make_unique<ExpressionStep>(
plan->getCurrentDataStream(),
query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone());
expression_step->setStepDescription("Remove unused columns after PREWHERE");
plan->addStep(std::move(expression_step));
}
}
expression_step->setStepDescription("Remove unused columns after PREWHERE");
plan->addStep(std::move(expression_step));
}
return plan;
@ -983,7 +977,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
std::move(parts),
data,
metadata_snapshot,
query_info.prewhere_info_list,
query_info.prewhere_info,
true,
column_names,
MergeTreeReadPool::BackoffSettings(settings),
@ -999,7 +993,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
query_info.prewhere_info_list, reader_settings, virt_columns);
query_info.prewhere_info, reader_settings, virt_columns);
if (i == 0)
{
@ -1022,7 +1016,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
auto source = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info_list, true, reader_settings, virt_columns, part.part_index_in_query);
query_info.prewhere_info, true, reader_settings, virt_columns, part.part_index_in_query);
res.emplace_back(std::move(source));
}
@ -1223,7 +1217,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info_list,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
@ -1241,7 +1235,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info_list,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
@ -1395,7 +1389,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
column_names,
part_it->ranges,
use_uncompressed_cache,
query_info.prewhere_info_list,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,

View File

@ -543,12 +543,12 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn:
MergeTreeRangeReader::MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereInfoListPtr & prewhere_info_list_,
const PrewhereInfoPtr & prewhere_info_,
bool last_reader_in_chain_)
: merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, prev_reader(prev_reader_)
, prewhere_info_list(prewhere_info_list_)
, prewhere_info(prewhere_info_)
, last_reader_in_chain(last_reader_in_chain_)
, is_initialized(true)
{
@ -558,19 +558,25 @@ MergeTreeRangeReader::MergeTreeRangeReader(
for (const auto & name_and_type : merge_tree_reader->getColumns())
sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
if (prewhere_info_list)
if (prewhere_info)
{
for (const auto & prewhere_info : *prewhere_info_list)
if (prewhere_info->filter_info)
{
if (prewhere_info.alias_actions)
prewhere_info.alias_actions->execute(sample_block, true);
if (prewhere_info->filter_info->actions)
prewhere_info->filter_info->actions->execute(sample_block, true);
if (prewhere_info.prewhere_actions)
prewhere_info.prewhere_actions->execute(sample_block, true);
if (prewhere_info.remove_prewhere_column)
sample_block.erase(prewhere_info.prewhere_column_name);
if (prewhere_info->filter_info->do_remove_column)
sample_block.erase(prewhere_info->filter_info->column_name);
}
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->execute(sample_block, true);
if (prewhere_info->prewhere_actions)
prewhere_info->prewhere_actions->execute(sample_block, true);
if (prewhere_info->remove_prewhere_column)
sample_block.erase(prewhere_info->prewhere_column_name);
}
}
@ -860,7 +866,7 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
{
if (!prewhere_info_list || prewhere_info_list->empty())
if (!prewhere_info)
return;
const auto & header = merge_tree_reader->getColumns();
@ -890,29 +896,37 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
for (size_t i = 0; i < prewhere_info_list->size(); ++i)
if (prewhere_info->filter_info)
{
const auto & prewhere_info = (*prewhere_info_list)[i];
if (prewhere_info->filter_info->actions)
prewhere_info->filter_info->actions->execute(block);
if (prewhere_info.alias_actions)
prewhere_info.alias_actions->execute(block);
const auto filter_column_pos = block.getPositionByName(prewhere_info->filter_info->column_name);
result.addFilter(block.getByPosition(filter_column_pos).column);
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
result.block_before_prewhere = block;
prewhere_info.prewhere_actions->execute(block);
prewhere_column_pos = block.getPositionByName(prewhere_info.prewhere_column_name);
result.addFilter(block.getByPosition(prewhere_column_pos).column);
if (i + 1 != prewhere_info_list->size() && prewhere_info.remove_prewhere_column)
block.erase(prewhere_column_pos);
if (prewhere_info->filter_info->do_remove_column)
block.erase(prewhere_info->filter_info->column_name);
else
block.getByPosition(prewhere_column_pos).column = block.getByPosition(prewhere_column_pos).type->createColumnConst(result.num_rows, 1);
block.getByPosition(filter_column_pos).column = block.getByPosition(filter_column_pos).type->createColumnConst(result.num_rows, 1);
}
block.getByPosition(prewhere_column_pos).column = nullptr;
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->execute(block);
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
result.block_before_prewhere = block;
if (prewhere_info->prewhere_actions)
prewhere_info->prewhere_actions->execute(block);
prewhere_column_pos = block.getPositionByName(prewhere_info->prewhere_column_name);
result.addFilter(block.getByPosition(prewhere_column_pos).column);
block.getByPosition(prewhere_column_pos).column.reset();
result.columns.clear();
result.columns.reserve(block.columns());
for (auto & col : block)
result.columns.emplace_back(std::move(col.column));
}
@ -925,7 +939,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
if (result.totalRowsPerGranule() == 0)
result.setFilterConstFalse();
/// If we need to filter in PREWHERE
else if (prewhere_info_list->back().need_filter || result.need_filter)
else if (prewhere_info->need_filter || result.need_filter)
{
/// If there is a filter and without optimized
if (result.getFilter() && last_reader_in_chain)
@ -966,11 +980,11 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
/// Check if the PREWHERE column is needed
if (!result.columns.empty())
{
if (prewhere_info_list->back().remove_prewhere_column)
if (prewhere_info->remove_prewhere_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
result.columns[prewhere_column_pos] =
getSampleBlock().getByName(prewhere_info_list->back().prewhere_column_name).type->
getSampleBlock().getByName(prewhere_info->prewhere_column_name).type->
createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst();
}
}
@ -978,7 +992,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
else
{
result.columns[prewhere_column_pos] = result.getFilterHolder()->convertToFullColumnIfConst();
if (getSampleBlock().getByName(prewhere_info_list->back().prewhere_column_name).type->isNullable())
if (getSampleBlock().getByName(prewhere_info->prewhere_column_name).type->isNullable())
result.columns[prewhere_column_pos] = makeNullable(std::move(result.columns[prewhere_column_pos]));
result.clearFilter(); // Acting as a flag to not filter in PREWHERE
}

View File

@ -13,8 +13,7 @@ using ColumnUInt8 = ColumnVector<UInt8>;
class IMergeTreeReader;
class MergeTreeIndexGranularity;
struct PrewhereInfo;
using PrewhereInfoList = std::vector<PrewhereInfo>;
using PrewhereInfoListPtr = std::shared_ptr<PrewhereInfoList>;
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
@ -25,7 +24,7 @@ public:
MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
const PrewhereInfoListPtr & prewhere_info_list,
const PrewhereInfoPtr & prewhere_info_,
bool last_reader_in_chain_);
MergeTreeRangeReader() = default;
@ -218,7 +217,7 @@ private:
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
PrewhereInfoListPtr prewhere_info_list;
PrewhereInfoPtr prewhere_info;
Stream stream;

View File

@ -24,7 +24,7 @@ MergeTreeReadPool::MergeTreeReadPool(
RangesInDataParts && parts_,
const MergeTreeData & data_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoListPtr & prewhere_info_list_,
const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_,
const Names & column_names_,
const BackoffSettings & backoff_settings_,
@ -37,7 +37,7 @@ MergeTreeReadPool::MergeTreeReadPool(
, column_names{column_names_}
, do_not_steal_tasks{do_not_steal_tasks_}
, predict_block_size_bytes{preferred_block_size_bytes_ > 0}
, prewhere_info_list{prewhere_info_list_}
, prewhere_info{prewhere_info_}
, parts_ranges{std::move(parts_)}
{
/// parts don't contain duplicate MergeTreeDataPart's.
@ -139,7 +139,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
return std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names,
per_part_column_name_set[part_idx], per_part_columns[part_idx], per_part_pre_columns[part_idx],
per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
}
MarkRanges MergeTreeReadPool::getRestMarks(const IMergeTreeDataPart & part, const MarkRange & from) const
@ -229,7 +229,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
per_part_sum_marks.push_back(sum_marks);
auto [required_columns, required_pre_columns, should_reorder] =
getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info_list, check_columns);
getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & required_column_names = required_columns.getNames();

View File

@ -71,9 +71,10 @@ private:
public:
MergeTreeReadPool(
const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoListPtr & prewhere_info_list, const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_, const bool do_not_steal_tasks_ = false);
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const bool do_not_steal_tasks_ = false);
MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names);
@ -106,7 +107,7 @@ private:
std::vector<NamesAndTypesList> per_part_pre_columns;
std::vector<char> per_part_should_reorder;
std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
PrewhereInfoListPtr prewhere_info_list;
PrewhereInfoPtr prewhere_info;
struct Part
{

View File

@ -22,7 +22,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
Names required_columns_,
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoListPtr & prewhere_info_list_,
const PrewhereInfoPtr & prewhere_info_,
bool check_columns,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -31,7 +31,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_list_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
@ -56,7 +56,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
ordered_names = header_without_virtual_columns.getNames();
task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info_list, check_columns);
task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
@ -71,7 +71,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
all_mark_ranges, owned_uncompressed_cache.get(),
owned_mark_cache.get(), reader_settings);
if (prewhere_info_list)
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
@ -100,7 +100,8 @@ try
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
task_columns.columns, task_columns.pre_columns, task_columns.should_reorder, std::move(size_predictor));
task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
return true;
}

View File

@ -26,7 +26,7 @@ public:
Names required_columns_,
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoListPtr & prewhere_info_list,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -22,7 +22,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
Names required_columns_,
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoListPtr & prewhere_info_list_,
const PrewhereInfoPtr & prewhere_info_,
bool check_columns_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -31,7 +31,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_list_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
@ -69,7 +69,7 @@ try
task_columns = getReadTaskColumns(
storage, metadata_snapshot, data_part,
required_columns, prewhere_info_list, check_columns);
required_columns, prewhere_info, check_columns);
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
@ -81,7 +81,8 @@ try
task = std::make_unique<MergeTreeReadTask>(
data_part, all_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
task_columns.pre_columns, task_columns.should_reorder, std::move(size_predictor));
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
task_columns.should_reorder, std::move(size_predictor));
if (!reader)
{
@ -93,7 +94,7 @@ try
reader = data_part->getReader(task_columns.columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
if (prewhere_info_list)
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}

View File

@ -26,7 +26,7 @@ public:
Names required_columns_,
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoListPtr & prewhere_info_list,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -18,12 +18,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoListPtr & prewhere_info_list_,
const PrewhereInfoPtr & prewhere_info_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_)
:
MergeTreeBaseSelectProcessor{
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_list_,
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_,
max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
@ -78,7 +78,7 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
if (prewhere_info_list)
if (prewhere_info)
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
@ -94,7 +94,7 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
if (prewhere_info_list)
if (prewhere_info)
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);

View File

@ -24,7 +24,7 @@ public:
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoListPtr & prewhere_info_list_,
const PrewhereInfoPtr & prewhere_info_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_);

View File

@ -15,8 +15,34 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
struct PrewhereInfo;
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
struct PrewhereDAGInfo;
using PrewhereDAGInfoPtr = std::shared_ptr<PrewhereDAGInfo>;
struct FilterInfo;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
struct FilterDAGInfo;
using FilterDAGInfoPtr = std::shared_ptr<FilterDAGInfo>;
struct InputOrderInfo;
using InputOrderInfoPtr = std::shared_ptr<const InputOrderInfo>;
struct TreeRewriterResult;
using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
class ReadInOrderOptimizer;
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct PrewhereInfo
{
/// Information about the preliminary filter expression, if any.
FilterInfoPtr filter_info;
/// Actions which are executed in order to alias columns are used for prewhere actions.
ExpressionActionsPtr alias_actions;
/// Actions which are executed on block in order to get filter column for prewhere step.
@ -26,15 +52,9 @@ struct PrewhereInfo
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
PrewhereInfo() = default;
explicit PrewhereInfo(ExpressionActionsPtr prewhere_actions_, String prewhere_column_name_)
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
};
using PrewhereInfoList = std::vector<PrewhereInfo>;
/// Same as PrewhereInfo, but with ActionsDAG
/// Same as PrewhereInfo, but with ActionsDAG.
struct PrewhereDAGInfo
{
ActionsDAGPtr alias_actions;
@ -54,7 +74,15 @@ struct PrewhereDAGInfo
/// Helper struct to store all the information about the filter expression.
struct FilterInfo
{
ActionsDAGPtr actions_dag;
ExpressionActionsPtr actions;
String column_name;
bool do_remove_column = false;
};
/// Same as FilterInfo, but with ActionsDAG.
struct FilterDAGInfo
{
ActionsDAGPtr actions;
String column_name;
bool do_remove_column = false;
@ -77,20 +105,6 @@ struct InputOrderInfo
bool operator !=(const InputOrderInfo & other) const { return !(*this == other); }
};
using PrewhereInfoListPtr = std::shared_ptr<PrewhereInfoList>;
using PrewhereDAGInfoPtr = std::shared_ptr<PrewhereDAGInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using InputOrderInfoPtr = std::shared_ptr<const InputOrderInfo>;
struct TreeRewriterResult;
using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
class ReadInOrderOptimizer;
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
/** Query along with some additional data,
* that can be used during query processing
* inside storage engines.
@ -106,7 +120,7 @@ struct SelectQueryInfo
TreeRewriterResultPtr syntax_analyzer_result;
PrewhereInfoListPtr prewhere_info_list;
PrewhereInfoPtr prewhere_info;
ReadInOrderOptimizerPtr order_optimizer;
/// Can be modified while reading from storage

View File

@ -314,27 +314,38 @@ void StorageBuffer::read(
}
else
{
if (query_info.prewhere_info_list)
if (query_info.prewhere_info)
{
for (const auto & prewhere_info : *query_info.prewhere_info_list)
if (query_info.prewhere_info->filter_info)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header, prewhere_info.prewhere_actions,
prewhere_info.prewhere_column_name,
prewhere_info.remove_prewhere_column);
header,
query_info.prewhere_info->filter_info->actions,
query_info.prewhere_info->filter_info->column_name,
query_info.prewhere_info->filter_info->do_remove_column);
});
if (prewhere_info.alias_actions)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header, prewhere_info.alias_actions);
});
}
}
if (query_info.prewhere_info->alias_actions)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header,
query_info.prewhere_info->alias_actions);
});
}
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
query_info.prewhere_info->prewhere_actions,
query_info.prewhere_info->prewhere_column_name,
query_info.prewhere_info->remove_prewhere_column);
});
}
auto read_from_buffers = std::make_unique<ReadFromPreparedSource>(std::move(pipe_from_buffers));