Refactor more

This commit is contained in:
Nikolai Kochetov 2020-11-03 22:05:47 +03:00
parent 36f9bcdb52
commit 7b61f5d641
9 changed files with 65 additions and 78 deletions

View File

@ -1126,7 +1126,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (auto actions = query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
{
prewhere_info = std::make_shared<PrewhereInfo>(actions, query.prewhere()->getColumnName());
prewhere_info = std::make_shared<PrewhereDAGInfo>(actions, query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
{

View File

@ -200,7 +200,7 @@ struct ExpressionAnalysisResult
/// Columns will be removed after prewhere actions execution.
NameSet columns_to_remove_after_prewhere;
PrewhereInfoPtr prewhere_info;
PrewhereDAGInfoPtr prewhere_info;
FilterInfoPtr filter_info;
ConstantFilterDescription prewhere_constant_filter_description;
ConstantFilterDescription where_constant_filter_description;

View File

@ -1108,7 +1108,7 @@ static StreamLocalLimits getLimitsForStorage(const Settings & settings, const Se
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan,
const PrewhereInfoPtr & prewhere_info, const NameSet & columns_to_remove_after_prewhere)
const PrewhereDAGInfoPtr & prewhere_info, const NameSet & columns_to_remove_after_prewhere)
{
auto & query = getSelectQuery();
const Settings & settings = context->getSettingsRef();
@ -1430,7 +1430,21 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.query = query_ptr;
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.sets = query_analyzer->getPreparedSets();
query_info.prewhere_info = prewhere_info;
if (prewhere_info)
{
query_info.prewhere_info = std::make_shared<PrewhereInfo>(
prewhere_info->prewhere_actions->buildExpressions(),
prewhere_info->prewhere_column_name);
if (prewhere_info->alias_actions)
query_info.prewhere_info->alias_actions = prewhere_info->alias_actions->buildExpressions();
if (prewhere_info->remove_columns_actions)
query_info.prewhere_info->remove_columns_actions = prewhere_info->remove_columns_actions->buildExpressions();
query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
query_info.prewhere_info->need_filter = prewhere_info->need_filter;
}
/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.

View File

@ -116,7 +116,7 @@ private:
void executeFetchColumns(
QueryProcessingStage::Enum processing_stage,
QueryPlan & query_plan,
const PrewhereInfoPtr & prewhere_info,
const PrewhereDAGInfoPtr & prewhere_info,
const NameSet & columns_to_remove_after_prewhere);
void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);

View File

@ -40,13 +40,6 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
, use_uncompressed_cache(use_uncompressed_cache_)
, virt_column_names(virt_column_names_)
{
if (prewhere_info)
{
if (prewhere_info->alias_actions)
prewhere_alias_actions = prewhere_info->alias_actions->buildExpressions();
prewhere_actions = prewhere_info->prewhere_actions->buildExpressions();
}
header_without_virtual_columns = getPort().getHeader();
for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
@ -81,39 +74,23 @@ void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & cu
{
if (reader->getColumns().empty())
{
current_task.range_reader = MergeTreeRangeReader(
pre_reader.get(), nullptr,
prewhere_alias_actions,
prewhere_actions,
prewhere_info->prewhere_column_name,
prewhere_info->remove_prewhere_column,
prewhere_info->need_filter,
true);
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true);
}
else
{
MergeTreeRangeReader * pre_reader_ptr = nullptr;
if (pre_reader != nullptr)
{
current_task.pre_range_reader = MergeTreeRangeReader(
pre_reader.get(), nullptr,
prewhere_alias_actions,
prewhere_actions,
prewhere_info->prewhere_column_name,
prewhere_info->remove_prewhere_column,
prewhere_info->need_filter,
false);
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false);
pre_reader_ptr = &current_task.pre_range_reader;
}
current_task.range_reader = MergeTreeRangeReader(
reader.get(), pre_reader_ptr, nullptr, nullptr, {}, false, false, true);
current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true);
}
}
else
{
current_task.range_reader = MergeTreeRangeReader(
reader.get(), nullptr, nullptr, nullptr, {}, false, false, true);
current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true);
}
}
@ -337,9 +314,9 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
if (prewhere_info)
{
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->buildExpressions()->execute(block);
prewhere_info->alias_actions->execute(block);
prewhere_info->prewhere_actions->buildExpressions()->execute(block);
prewhere_info->prewhere_actions->execute(block);
auto & prewhere_column = block.getByName(prewhere_info->prewhere_column_name);
if (!prewhere_column.type->canBeUsedInBooleanContext())

View File

@ -58,8 +58,6 @@ protected:
StorageMetadataPtr metadata_snapshot;
PrewhereInfoPtr prewhere_info;
ExpressionActionsPtr prewhere_alias_actions;
ExpressionActionsPtr prewhere_actions;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;

View File

@ -489,20 +489,11 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn:
MergeTreeRangeReader::MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
ExpressionActionsPtr prewhere_alias_actions_,
ExpressionActionsPtr prewhere_actions_,
String prewhere_column_name_,
bool remove_prewhere_column_,
bool prewhere_need_filter_,
const PrewhereInfoPtr & prewhere_,
bool last_reader_in_chain_)
: merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity)), prev_reader(prev_reader_)
, prewhere_alias_actions(std::move(prewhere_alias_actions_))
, prewhere_actions(std::move(prewhere_actions_))
, prewhere_column_name(std::move(prewhere_column_name_))
, remove_prewhere_column(remove_prewhere_column_)
, prewhere_need_filter(prewhere_need_filter_)
, last_reader_in_chain(last_reader_in_chain_), is_initialized(true)
, prewhere(prewhere_), last_reader_in_chain(last_reader_in_chain_), is_initialized(true)
{
if (prev_reader)
sample_block = prev_reader->getSampleBlock();
@ -510,16 +501,16 @@ MergeTreeRangeReader::MergeTreeRangeReader(
for (const auto & name_and_type : merge_tree_reader->getColumns())
sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
if (prewhere_actions)
if (prewhere)
{
if (prewhere_alias_actions)
prewhere_alias_actions->execute(sample_block, true);
if (prewhere->alias_actions)
prewhere->alias_actions->execute(sample_block, true);
if (prewhere_actions)
prewhere_actions->execute(sample_block, true);
if (prewhere->prewhere_actions)
prewhere->prewhere_actions->execute(sample_block, true);
if (remove_prewhere_column)
sample_block.erase(prewhere_column_name);
if (prewhere->remove_prewhere_column)
sample_block.erase(prewhere->prewhere_column_name);
}
}
@ -803,7 +794,7 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
{
if (!prewhere_actions)
if (!prewhere)
return;
const auto & header = merge_tree_reader->getColumns();
@ -834,14 +825,14 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
if (prewhere_alias_actions)
prewhere_alias_actions->execute(block);
if (prewhere->alias_actions)
prewhere->alias_actions->execute(block);
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
result.block_before_prewhere = block;
prewhere_actions->execute(block);
prewhere->prewhere_actions->execute(block);
prewhere_column_pos = block.getPositionByName(prewhere_column_name);
prewhere_column_pos = block.getPositionByName(prewhere->prewhere_column_name);
result.columns.clear();
result.columns.reserve(block.columns());
@ -869,7 +860,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
if (result.totalRowsPerGranule() == 0)
result.setFilterConstFalse();
/// If we need to filter in PREWHERE
else if (prewhere_need_filter || result.need_filter)
else if (prewhere->need_filter || result.need_filter)
{
/// If there is a filter and without optimized
if (result.getFilter() && last_reader_in_chain)
@ -910,11 +901,11 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
/// Check if the PREWHERE column is needed
if (!result.columns.empty())
{
if (remove_prewhere_column)
if (prewhere->remove_prewhere_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
result.columns[prewhere_column_pos] =
getSampleBlock().getByName(prewhere_column_name).type->
getSampleBlock().getByName(prewhere->prewhere_column_name).type->
createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst();
}
}
@ -922,7 +913,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
else
{
result.columns[prewhere_column_pos] = result.getFilterHolder()->convertToFullColumnIfConst();
if (getSampleBlock().getByName(prewhere_column_name).type->isNullable())
if (getSampleBlock().getByName(prewhere->prewhere_column_name).type->isNullable())
result.columns[prewhere_column_pos] = makeNullable(std::move(result.columns[prewhere_column_pos]));
result.clearFilter(); // Acting as a flag to not filter in PREWHERE
}

View File

@ -24,11 +24,7 @@ public:
MergeTreeRangeReader(
IMergeTreeReader * merge_tree_reader_,
MergeTreeRangeReader * prev_reader_,
ExpressionActionsPtr prewhere_alias_actions_,
ExpressionActionsPtr prewhere_actions_,
String prewhere_column_name_,
bool remove_prewhere_column_,
bool prewhere_need_filter_,
const PrewhereInfoPtr & prewhere_,
bool last_reader_in_chain_);
MergeTreeRangeReader() = default;
@ -221,12 +217,7 @@ private:
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
ExpressionActionsPtr prewhere_alias_actions;
ExpressionActionsPtr prewhere_actions;
String prewhere_column_name;
bool remove_prewhere_column;
bool prewhere_need_filter;
PrewhereInfoPtr prewhere;
Stream stream;

View File

@ -18,20 +18,35 @@ using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
struct PrewhereInfo
{
/// Actions which are executed in order to alias columns are used for prewhere actions.
ActionsDAGPtr alias_actions;
ExpressionActionsPtr alias_actions;
/// Actions which are executed on block in order to get filter column for prewhere step.
ActionsDAGPtr prewhere_actions;
ExpressionActionsPtr prewhere_actions;
/// Actions which are executed after reading from storage in order to remove unused columns.
ActionsDAGPtr remove_columns_actions;
ExpressionActionsPtr remove_columns_actions;
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
PrewhereInfo() = default;
explicit PrewhereInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_)
explicit PrewhereInfo(ExpressionActionsPtr prewhere_actions_, String prewhere_column_name_)
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
};
/// Same as PrewhereInfo, but with ActionsDAG
struct PrewhereDAGInfo
{
ActionsDAGPtr alias_actions;
ActionsDAGPtr prewhere_actions;
ActionsDAGPtr remove_columns_actions;
String prewhere_column_name;
bool remove_prewhere_column = false;
bool need_filter = false;
PrewhereDAGInfo() = default;
explicit PrewhereDAGInfo(ActionsDAGPtr prewhere_actions_, String prewhere_column_name_)
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
};
/// Helper struct to store all the information about the filter expression.
struct FilterInfo
{
@ -57,6 +72,7 @@ struct InputOrderInfo
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using PrewhereDAGInfoPtr = std::shared_ptr<PrewhereDAGInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using InputOrderInfoPtr = std::shared_ptr<const InputOrderInfo>;