support prewhere, row_filter, read_in_order and decent projection selection

TODO set index analysis in projection
This commit is contained in:
Amos Bird 2021-05-02 21:38:19 +08:00
parent f7f949c1f9
commit 9c069ebdbf
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
11 changed files with 1365 additions and 1061 deletions

View File

@ -477,6 +477,11 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
addPrewhereAliasActions();
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.required_columns = required_columns;
source_header = metadata_snapshot->getSampleBlockForColumns(required_columns, storage->getVirtuals(), storage->getStorageID());
}
@ -589,6 +594,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
{
from_stage = storage->getQueryProcessingStage(context, options.to_stage, metadata_snapshot, query_info);
/// TODO how can we make IN index work if we cache parts before selecting a projection?
/// XXX Used for IN set index analysis. Is this a proper way?
if (query_info.projection)
metadata_snapshot->selected_projection = query_info.projection->desc;
@ -1043,7 +1049,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
&& !expressions.has_window)
{
if (expressions.has_order_by)
executeOrder(query_plan, query_info.input_order_info);
executeOrder(
query_plan,
query_info.input_order_info ? query_info.input_order_info
: (query_info.projection ? query_info.projection->input_order_info : nullptr));
if (expressions.has_order_by && query.limitLength())
executeDistinct(query_plan, false, expressions.selected_columns, true);
@ -1169,9 +1178,24 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.need_aggregate)
{
executeAggregation(query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
if (query_info.projection)
{
executeAggregation(
query_plan,
expressions.before_aggregation,
aggregate_overflow_row,
aggregate_final,
query_info.projection->input_order_info);
/// We need to reset input order info, so that executeOrder can't use it
query_info.projection->input_order_info.reset();
}
else
{
executeAggregation(
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
}
}
// Now we must execute:
@ -1301,7 +1325,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
else if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(query_plan, "for ORDER BY");
else /// Otherwise, just sort.
executeOrder(query_plan, query_info.input_order_info);
executeOrder(
query_plan,
query_info.input_order_info ? query_info.input_order_info
: (query_info.projection ? query_info.projection->input_order_info : nullptr));
}
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
@ -1485,13 +1512,168 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
query_plan.addStep(std::move(read_from_pipe));
}
void InterpreterSelectQuery::addPrewhereAliasActions()
{
auto & prewhere_info = analysis_result.prewhere_info;
auto & columns_to_remove_after_prewhere = analysis_result.columns_to_remove_after_prewhere;
/// Detect, if ALIAS columns are required for query execution
auto alias_columns_required = false;
const ColumnsDescription & storage_columns = metadata_snapshot->getColumns();
for (const auto & column_name : required_columns)
{
auto column_default = storage_columns.getDefault(column_name);
if (column_default && column_default->kind == ColumnDefaultKind::Alias)
{
alias_columns_required = true;
break;
}
}
/// There are multiple sources of required columns:
/// - raw required columns,
/// - columns deduced from ALIAS columns,
/// - raw required columns from PREWHERE,
/// - columns deduced from ALIAS columns from PREWHERE.
/// PREWHERE is a special case, since we need to resolve it and pass directly to `IStorage::read()`
/// before any other executions.
if (alias_columns_required)
{
NameSet required_columns_from_prewhere; /// Set of all (including ALIAS) required columns for PREWHERE
NameSet required_aliases_from_prewhere; /// Set of ALIAS required columns for PREWHERE
if (prewhere_info)
{
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
if (prewhere_info->row_level_filter_actions)
{
auto row_level_required_columns = prewhere_info->row_level_filter_actions->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(row_level_required_columns.begin(), row_level_required_columns.end());
}
}
/// Expression, that contains all raw required columns
ASTPtr required_columns_all_expr = std::make_shared<ASTExpressionList>();
/// Expression, that contains raw required columns for PREWHERE
ASTPtr required_columns_from_prewhere_expr = std::make_shared<ASTExpressionList>();
/// Sort out already known required columns between expressions,
/// also populate `required_aliases_from_prewhere`.
for (const auto & column : required_columns)
{
ASTPtr column_expr;
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;
if (is_alias)
{
auto column_decl = storage_columns.get(column);
column_expr = column_default->expression->clone();
// recursive visit for alias to alias
replaceAliasColumnsInQuery(
column_expr, metadata_snapshot->getColumns(), syntax_analyzer_result->getArrayJoinSourceNameSet(), context);
column_expr = addTypeConversionToAST(
std::move(column_expr), column_decl.type->getName(), metadata_snapshot->getColumns().getAll(), context);
column_expr = setAlias(column_expr, column);
}
else
column_expr = std::make_shared<ASTIdentifier>(column);
if (required_columns_from_prewhere.count(column))
{
required_columns_from_prewhere_expr->children.emplace_back(std::move(column_expr));
if (is_alias)
required_aliases_from_prewhere.insert(column);
}
else
required_columns_all_expr->children.emplace_back(std::move(column_expr));
}
/// Columns, which we will get after prewhere and filter executions.
NamesAndTypesList required_columns_after_prewhere;
NameSet required_columns_after_prewhere_set;
/// Collect required columns from prewhere expression actions.
if (prewhere_info)
{
NameSet columns_to_remove(columns_to_remove_after_prewhere.begin(), columns_to_remove_after_prewhere.end());
Block prewhere_actions_result = prewhere_info->prewhere_actions->getResultColumns();
/// Populate required columns with the columns, added by PREWHERE actions and not removed afterwards.
/// XXX: looks hacky that we already know which columns after PREWHERE we won't need for sure.
for (const auto & column : prewhere_actions_result)
{
if (prewhere_info->remove_prewhere_column && column.name == prewhere_info->prewhere_column_name)
continue;
if (columns_to_remove.count(column.name))
continue;
required_columns_all_expr->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
required_columns_after_prewhere.emplace_back(column.name, column.type);
}
required_columns_after_prewhere_set
= ext::map<NameSet>(required_columns_after_prewhere, [](const auto & it) { return it.name; });
}
auto syntax_result
= TreeRewriter(context).analyze(required_columns_all_expr, required_columns_after_prewhere, storage, metadata_snapshot);
alias_actions = ExpressionAnalyzer(required_columns_all_expr, syntax_result, context).getActionsDAG(true);
/// The set of required columns could be added as a result of adding an action to calculate ALIAS.
required_columns = alias_actions->getRequiredColumns().getNames();
/// Do not remove prewhere filter if it is a column which is used as alias.
if (prewhere_info && prewhere_info->remove_prewhere_column)
if (required_columns.end() != std::find(required_columns.begin(), required_columns.end(), prewhere_info->prewhere_column_name))
prewhere_info->remove_prewhere_column = false;
/// Remove columns which will be added by prewhere.
required_columns.erase(
std::remove_if(
required_columns.begin(),
required_columns.end(),
[&](const String & name) { return required_columns_after_prewhere_set.count(name) != 0; }),
required_columns.end());
if (prewhere_info)
{
/// Don't remove columns which are needed to be aliased.
for (const auto & name : required_columns)
prewhere_info->prewhere_actions->tryRestoreColumn(name);
auto analyzed_result
= TreeRewriter(context).analyze(required_columns_from_prewhere_expr, metadata_snapshot->getColumns().getAllPhysical());
prewhere_info->alias_actions
= ExpressionAnalyzer(required_columns_from_prewhere_expr, analyzed_result, context).getActionsDAG(true, false);
/// Add (physical?) columns required by alias actions.
auto required_columns_from_alias = prewhere_info->alias_actions->getRequiredColumns();
Block prewhere_actions_result = prewhere_info->prewhere_actions->getResultColumns();
for (auto & column : required_columns_from_alias)
if (!prewhere_actions_result.has(column.name))
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column.name))
required_columns.push_back(column.name);
/// Add physical columns required by prewhere actions.
for (const auto & column : required_columns_from_prewhere)
if (required_aliases_from_prewhere.count(column) == 0)
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
required_columns.push_back(column);
}
}
}
void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan)
{
auto & query = getSelectQuery();
const Settings & settings = context->getSettingsRef();
auto & expressions = analysis_result;
auto & prewhere_info = expressions.prewhere_info;
auto & columns_to_remove_after_prewhere = expressions.columns_to_remove_after_prewhere;
/// Optimization for trivial query like SELECT count() FROM table.
bool optimize_trivial_count =
@ -1560,160 +1742,6 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
}
}
/// Actions to calculate ALIAS if required.
ActionsDAGPtr alias_actions;
if (storage)
{
/// Detect, if ALIAS columns are required for query execution
auto alias_columns_required = false;
const ColumnsDescription & storage_columns = metadata_snapshot->getColumns();
for (const auto & column_name : required_columns)
{
auto column_default = storage_columns.getDefault(column_name);
if (column_default && column_default->kind == ColumnDefaultKind::Alias)
{
alias_columns_required = true;
break;
}
}
/// There are multiple sources of required columns:
/// - raw required columns,
/// - columns deduced from ALIAS columns,
/// - raw required columns from PREWHERE,
/// - columns deduced from ALIAS columns from PREWHERE.
/// PREWHERE is a special case, since we need to resolve it and pass directly to `IStorage::read()`
/// before any other executions.
if (alias_columns_required)
{
NameSet required_columns_from_prewhere; /// Set of all (including ALIAS) required columns for PREWHERE
NameSet required_aliases_from_prewhere; /// Set of ALIAS required columns for PREWHERE
if (prewhere_info)
{
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
if (prewhere_info->row_level_filter_actions)
{
auto row_level_required_columns = prewhere_info->row_level_filter_actions->getRequiredColumns().getNames();
required_columns_from_prewhere.insert(row_level_required_columns.begin(), row_level_required_columns.end());
}
}
/// Expression, that contains all raw required columns
ASTPtr required_columns_all_expr = std::make_shared<ASTExpressionList>();
/// Expression, that contains raw required columns for PREWHERE
ASTPtr required_columns_from_prewhere_expr = std::make_shared<ASTExpressionList>();
/// Sort out already known required columns between expressions,
/// also populate `required_aliases_from_prewhere`.
for (const auto & column : required_columns)
{
ASTPtr column_expr;
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;
if (is_alias)
{
auto column_decl = storage_columns.get(column);
column_expr = column_default->expression->clone();
// recursive visit for alias to alias
replaceAliasColumnsInQuery(column_expr, metadata_snapshot->getColumns(), syntax_analyzer_result->getArrayJoinSourceNameSet(), context);
column_expr = addTypeConversionToAST(std::move(column_expr), column_decl.type->getName(), metadata_snapshot->getColumns().getAll(), context);
column_expr = setAlias(column_expr, column);
}
else
column_expr = std::make_shared<ASTIdentifier>(column);
if (required_columns_from_prewhere.count(column))
{
required_columns_from_prewhere_expr->children.emplace_back(std::move(column_expr));
if (is_alias)
required_aliases_from_prewhere.insert(column);
}
else
required_columns_all_expr->children.emplace_back(std::move(column_expr));
}
/// Columns, which we will get after prewhere and filter executions.
NamesAndTypesList required_columns_after_prewhere;
NameSet required_columns_after_prewhere_set;
/// Collect required columns from prewhere expression actions.
if (prewhere_info)
{
NameSet columns_to_remove(columns_to_remove_after_prewhere.begin(), columns_to_remove_after_prewhere.end());
Block prewhere_actions_result = prewhere_info->prewhere_actions->getResultColumns();
/// Populate required columns with the columns, added by PREWHERE actions and not removed afterwards.
/// XXX: looks hacky that we already know which columns after PREWHERE we won't need for sure.
for (const auto & column : prewhere_actions_result)
{
if (prewhere_info->remove_prewhere_column && column.name == prewhere_info->prewhere_column_name)
continue;
if (columns_to_remove.count(column.name))
continue;
required_columns_all_expr->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
required_columns_after_prewhere.emplace_back(column.name, column.type);
}
required_columns_after_prewhere_set
= ext::map<NameSet>(required_columns_after_prewhere, [](const auto & it) { return it.name; });
}
auto syntax_result = TreeRewriter(context).analyze(required_columns_all_expr, required_columns_after_prewhere, storage, metadata_snapshot);
alias_actions = ExpressionAnalyzer(required_columns_all_expr, syntax_result, context).getActionsDAG(true);
/// The set of required columns could be added as a result of adding an action to calculate ALIAS.
required_columns = alias_actions->getRequiredColumns().getNames();
/// Do not remove prewhere filter if it is a column which is used as alias.
if (prewhere_info && prewhere_info->remove_prewhere_column)
if (required_columns.end()
!= std::find(required_columns.begin(), required_columns.end(), prewhere_info->prewhere_column_name))
prewhere_info->remove_prewhere_column = false;
/// Remove columns which will be added by prewhere.
required_columns.erase(std::remove_if(required_columns.begin(), required_columns.end(), [&](const String & name)
{
return required_columns_after_prewhere_set.count(name) != 0;
}), required_columns.end());
if (prewhere_info)
{
/// Don't remove columns which are needed to be aliased.
for (const auto & name : required_columns)
prewhere_info->prewhere_actions->tryRestoreColumn(name);
auto analyzed_result
= TreeRewriter(context).analyze(required_columns_from_prewhere_expr, metadata_snapshot->getColumns().getAllPhysical());
prewhere_info->alias_actions
= ExpressionAnalyzer(required_columns_from_prewhere_expr, analyzed_result, context).getActionsDAG(true, false);
/// Add (physical?) columns required by alias actions.
auto required_columns_from_alias = prewhere_info->alias_actions->getRequiredColumns();
Block prewhere_actions_result = prewhere_info->prewhere_actions->getResultColumns();
for (auto & column : required_columns_from_alias)
if (!prewhere_actions_result.has(column.name))
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column.name))
required_columns.push_back(column.name);
/// Add physical columns required by prewhere actions.
for (const auto & column : required_columns_from_prewhere)
if (required_aliases_from_prewhere.count(column) == 0)
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
required_columns.push_back(column);
}
}
}
/// Limitation on the number of columns to read.
/// It's not applied in 'only_analyze' mode, because the query could be analyzed without removal of unnecessary columns.
if (!options.only_analyze && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
@ -1804,9 +1832,10 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (max_streams > 1 && !is_remote)
max_streams *= settings.max_streams_to_max_threads_ratio;
query_info.syntax_analyzer_result = syntax_analyzer_result;
// TODO figure out how to make set for projections
query_info.sets = query_analyzer->getPreparedSets();
auto actions_settings = ExpressionActionsSettings::fromContext(context);
auto & prewhere_info = analysis_result.prewhere_info;
if (prewhere_info)
{
@ -1828,20 +1857,46 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.
if ((analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order) && !query_info.projection)
if ((analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order)
&& (!query_info.projection || query_info.projection->complete))
{
if (analysis_result.optimize_read_in_order)
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.order_by_elements_actions,
getSortDescription(query, context),
query_info.syntax_analyzer_result);
{
if (query_info.projection)
{
query_info.projection->order_optimizer = std::make_shared<ReadInOrderOptimizer>(
// TODO Do we need a projection variant for this field?
analysis_result.order_by_elements_actions,
getSortDescription(query, context),
query_info.syntax_analyzer_result);
}
else
{
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.order_by_elements_actions, getSortDescription(query, context), query_info.syntax_analyzer_result);
}
}
else
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.group_by_elements_actions,
getSortDescriptionFromGroupBy(query),
query_info.syntax_analyzer_result);
{
if (query_info.projection)
{
query_info.projection->order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query_info.projection->group_by_elements_actions,
getSortDescriptionFromGroupBy(query),
query_info.syntax_analyzer_result);
}
else
{
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.group_by_elements_actions, getSortDescriptionFromGroupBy(query), query_info.syntax_analyzer_result);
}
}
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context);
if (query_info.projection)
query_info.projection->input_order_info
= query_info.projection->order_optimizer->getInputOrder(query_info.projection->desc->metadata, context);
else
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context);
}
StreamLocalLimits limits;
@ -2504,8 +2559,11 @@ void InterpreterSelectQuery::executeExtremes(QueryPlan & query_plan)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPlan & query_plan, SubqueriesForSets & subqueries_for_sets)
{
if (query_info.input_order_info)
executeMergeSorted(query_plan, query_info.input_order_info->order_key_prefix_descr, 0, "before creating sets for subqueries and joins");
const auto & input_order_info = query_info.input_order_info
? query_info.input_order_info
: (query_info.projection ? query_info.projection->input_order_info : nullptr);
if (input_order_info)
executeMergeSorted(query_plan, input_order_info->order_key_prefix_descr, 0, "before creating sets for subqueries and joins");
const Settings & settings = context->getSettingsRef();

View File

@ -117,6 +117,8 @@ private:
ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
void addPrewhereAliasActions();
Block getSampleBlockImpl();
void executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
@ -183,6 +185,9 @@ private:
/// Structure of query source (table, subquery, etc).
Block source_header;
/// Actions to calculate ALIAS if required.
ActionsDAGPtr alias_actions;
/// The subquery interpreter, if the subquery
std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter_subquery;

View File

@ -33,7 +33,6 @@
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Storages/AlterCommands.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataUtils.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
@ -3800,6 +3799,358 @@ bool MergeTreeData::mayBenefitFromIndexForIn(
}
}
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
static void selectBestProjection(
const MergeTreeDataSelectExecutor & reader,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ProjectionCandidate & candidate,
ContextPtr query_context,
const PartitionIdToMaxBlock * max_added_blocks,
const Settings & settings,
const MergeTreeData::DataPartsVector & parts,
ProjectionCandidate *& selected_candidate,
size_t & min_sum_marks)
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
{
const auto & projections = part->getProjectionParts();
auto it = projections.find(candidate.desc->name);
if (it != projections.end())
projection_parts.push_back(it->second);
else
normal_parts.push_back(part);
}
if (projection_parts.empty())
return;
candidate.merge_tree_data_select_base_cache = std::make_unique<MergeTreeDataSelectCache>();
candidate.merge_tree_data_select_projection_cache = std::make_unique<MergeTreeDataSelectCache>();
reader.readFromParts(
projection_parts,
candidate.required_columns,
metadata_snapshot,
candidate.desc->metadata,
query_info, // TODO syntax_analysis_result set in index
query_context,
0, // max_block_size is unused when getting cache
settings.max_threads,
max_added_blocks,
candidate.merge_tree_data_select_projection_cache.get());
size_t sum_marks = candidate.merge_tree_data_select_projection_cache->sum_marks;
if (normal_parts.empty())
{
// All parts are projection parts which allows us to use in_order_optimization.
// TODO It might be better to use a complete projection even with more marks to read.
candidate.complete = true;
}
else
{
reader.readFromParts(
normal_parts,
query_info.required_columns,
metadata_snapshot,
metadata_snapshot,
query_info, // TODO syntax_analysis_result set in index
query_context,
0, // max_block_size is unused when getting cache
settings.max_threads,
max_added_blocks,
candidate.merge_tree_data_select_base_cache.get());
sum_marks += candidate.merge_tree_data_select_base_cache->sum_marks;
}
// We choose the projection with least sum_marks to read.
if (sum_marks < min_sum_marks)
{
selected_candidate = &candidate;
min_sum_marks = sum_marks;
}
}
bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const
{
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections)
return false;
const auto & query_ptr = query_info.query;
InterpreterSelectQuery select(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
const auto & analysis_result = select.getAnalysisResult();
bool can_use_aggregate_projection = true;
/// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage,
/// we cannot use aggregate projection.
if (analysis_result.join != nullptr || analysis_result.array_join != nullptr)
can_use_aggregate_projection = false;
/// Check if all needed columns can be provided by some aggregate projection. Here we also try
/// to find expression matches. For example, suppose an aggregate projection contains a column
/// named sum(x) and the given query also has an expression called sum(x), it's a match. This is
/// why we need to ignore all aliases during projection creation and the above query planning.
/// It's also worth noting that, sqrt(sum(x)) will also work because we can treat sum(x) as a
/// required column.
/// The ownership of ProjectionDescription is hold in metadata_snapshot which lives along with
/// InterpreterSelect, thus we can store the raw pointer here.
std::vector<ProjectionCandidate> candidates;
NameSet keys;
std::unordered_map<std::string_view, size_t> key_name_pos_map;
size_t pos = 0;
for (const auto & desc : select.getQueryAnalyzer()->aggregationKeys())
{
keys.insert(desc.name);
key_name_pos_map.insert({desc.name, pos++});
}
auto actions_settings = ExpressionActionsSettings::fromSettings(settings);
// All required columns should be provided by either current projection or previous actions
// Let's traverse backward to finish the check.
// TODO what if there is a column with name sum(x) and an aggregate sum(x)?
auto rewrite_before_where =
[&](ProjectionCandidate & candidate, const ProjectionDescription & projection,
NameSet & required_columns, const Block & source_block, const Block & aggregates)
{
if (analysis_result.before_where)
{
candidate.before_where = analysis_result.before_where->clone();
required_columns = candidate.before_where->foldActionsByProjection(
required_columns,
projection.sample_block_for_keys,
query_ptr->as<const ASTSelectQuery &>().where()->getColumnName());
if (required_columns.empty())
return false;
candidate.before_where->addAggregatesViaProjection(aggregates);
}
if (analysis_result.prewhere_info)
{
auto & prewhere_info = analysis_result.prewhere_info;
candidate.prewhere_info = std::make_shared<PrewhereInfo>();
candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
candidate.prewhere_info->need_filter = prewhere_info->need_filter;
auto prewhere_actions = prewhere_info->prewhere_actions->clone();
NameSet prewhere_required_columns;
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_actions, actions_settings);
if (prewhere_info->row_level_filter_actions)
{
auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone();
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->row_level_filter
= std::make_shared<ExpressionActions>(row_level_filter_actions, actions_settings);
}
// TODO wait for alias analysis to be moved into expression analyzer
if (prewhere_info->alias_actions)
{
auto alias_actions = prewhere_info->alias_actions->clone();
prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(alias_actions, actions_settings);
}
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
}
bool match = true;
for (const auto & column : required_columns)
{
/// There are still missing columns, fail to match
if (!source_block.has(column))
{
match = false;
break;
}
}
return match;
};
for (const auto & projection : metadata_snapshot->projections)
{
ProjectionCandidate candidate{};
candidate.desc = &projection;
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
{
bool match = true;
Block aggregates;
// Let's first check if all aggregates are provided by current projection
for (const auto & aggregate : select.getQueryAnalyzer()->aggregates())
{
const auto * column = projection.sample_block.findByName(aggregate.column_name);
if (column)
{
aggregates.insert(*column);
}
else
{
match = false;
break;
}
}
if (!match)
continue;
// Check if all aggregation keys can be either provided by some action, or by current
// projection directly. Reshape the `before_aggregation` action DAG so that it only
// needs to provide aggregation keys, and certain children DAG might be substituted by
// some keys in projection.
candidate.before_aggregation = analysis_result.before_aggregation->clone();
auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys);
if (required_columns.empty())
continue;
if (analysis_result.optimize_aggregation_in_order)
{
for (const auto & key : keys)
{
auto actions_dag = analysis_result.before_aggregation->clone();
actions_dag->foldActionsByProjection({key}, projection.sample_block_for_keys);
candidate.group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(actions_dag, actions_settings));
}
}
// Reorder aggregation keys and attach aggregates
candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map);
candidate.before_aggregation->addAggregatesViaProjection(aggregates);
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block_for_keys, aggregates))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
for (const auto & aggregate : aggregates)
candidate.required_columns.push_back(aggregate.name);
candidates.push_back(std::move(candidate));
}
}
if (projection.type == ProjectionDescription::Type::Normal && (analysis_result.hasWhere() || analysis_result.hasPrewhere()))
{
// TODO is it possible?
if (!analysis_result.before_order_by)
continue;
NameSet required_columns;
for (const auto & column : analysis_result.before_order_by->getRequiredColumns())
required_columns.insert(column.name);
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block, {}))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
candidates.push_back(std::move(candidate));
}
}
}
// Let's select the best projection to execute the query.
if (!candidates.empty())
{
// First build a MergeTreeDataSelectCache to check if a projection is indeed better than base
query_info.merge_tree_data_select_cache = std::make_unique<MergeTreeDataSelectCache>();
std::unique_ptr<PartitionIdToMaxBlock> max_added_blocks;
if (settings.select_sequential_consistency)
{
if (const StorageReplicatedMergeTree * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(this))
max_added_blocks = std::make_unique<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
}
auto parts = getDataPartsVector();
MergeTreeDataSelectExecutor reader(*this);
reader.readFromParts(
parts,
query_info.required_columns,
metadata_snapshot,
metadata_snapshot,
query_info, // TODO syntax_analysis_result set in index
query_context,
0, // max_block_size is unused when getting cache
settings.max_threads,
max_added_blocks.get(),
query_info.merge_tree_data_select_cache.get());
size_t min_sum_marks = query_info.merge_tree_data_select_cache->sum_marks;
ProjectionCandidate * selected_candidate = nullptr;
/// Favor aggregate projections
for (auto & candidate : candidates)
{
if (candidate.desc->type == ProjectionDescription::Type::Aggregate)
{
selectBestProjection(
reader,
metadata_snapshot,
query_info,
candidate,
query_context,
max_added_blocks.get(),
settings,
parts,
selected_candidate,
min_sum_marks);
}
}
/// Select the best normal projection if no aggregate projection is available
if (!selected_candidate)
{
for (auto & candidate : candidates)
{
if (candidate.desc->type == ProjectionDescription::Type::Normal)
{
selectBestProjection(
reader,
metadata_snapshot,
query_info,
candidate,
query_context,
max_added_blocks.get(),
settings,
parts,
selected_candidate,
min_sum_marks);
}
}
}
if (!selected_candidate)
return false;
if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate)
{
selected_candidate->aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
query_info.projection = std::move(*selected_candidate);
return true;
}
return false;
}
QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
ContextPtr query_context,

View File

@ -358,6 +358,9 @@ public:
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const;
QueryProcessingStage::Enum getQueryProcessingStage(
ContextPtr query_context,
QueryProcessingStage::Enum to_stage,

File diff suppressed because it is too large Load Diff

View File

@ -13,6 +13,22 @@ namespace DB
class KeyCondition;
struct MergeTreeDataSelectSamplingData
{
bool use_sampling;
std::shared_ptr<ASTFunction> filter_function;
ActionsDAGPtr filter_expression;
};
struct MergeTreeDataSelectCache
{
RangesInDataParts parts_with_ranges;
MergeTreeDataSelectSamplingData sampling;
std::unique_ptr<ReadFromMergeTree::IndexStats> index_stats;
size_t sum_marks = 0;
size_t sum_ranges = 0;
bool use_cache = false;
};
/** Executes SELECT queries on data from the merge tree.
*/
@ -36,18 +52,17 @@ public:
QueryProcessingStage::Enum processed_stage,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
QueryPlanPtr readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
UInt64 max_block_size,
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr,
size_t * num_granules_to_read = nullptr,
bool use_projection_metadata = false) const;
MergeTreeDataSelectCache * cache = nullptr) const;
private:
const MergeTreeData & data;
@ -83,7 +98,8 @@ private:
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection,
const String & query_id) const;
const String & query_id,
const InputOrderInfoPtr & input_order_info) const;
QueryPlanPtr spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,

View File

@ -1,228 +0,0 @@
#include <Storages/MergeTree/MergeTreeDataUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
namespace DB
{
bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info)
{
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections)
return false;
const auto & query_ptr = query_info.query;
InterpreterSelectQuery select(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
const auto & analysis_result = select.getAnalysisResult();
bool can_use_aggregate_projection = true;
/// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage,
/// we cannot use aggregate projection.
if (analysis_result.join != nullptr || analysis_result.array_join != nullptr)
can_use_aggregate_projection = false;
/// Check if all needed columns can be provided by some aggregate projection. Here we also try
/// to find expression matches. For example, suppose an aggregate projection contains a column
/// named sum(x) and the given query also has an expression called sum(x), it's a match. This is
/// why we need to ignore all aliases during projection creation and the above query planning.
/// It's also worth noting that, sqrt(sum(x)) will also work because we can treat sum(x) as a
/// required column.
/// The ownership of ProjectionDescription is hold in metadata_snapshot which lives along with
/// InterpreterSelect, thus we can store the raw pointer here.
std::vector<ProjectionCandidate> candidates;
NameSet keys;
std::unordered_map<std::string_view, size_t> key_name_pos_map;
size_t pos = 0;
for (const auto & desc : select.getQueryAnalyzer()->aggregationKeys())
{
keys.insert(desc.name);
key_name_pos_map.insert({desc.name, pos++});
}
// All required columns should be provided by either current projection or previous actions
// Let's traverse backward to finish the check.
// TODO what if there is a column with name sum(x) and an aggregate sum(x)?
auto rewrite_before_where =
[&](ProjectionCandidate & candidate, const ProjectionDescription & projection,
NameSet & required_columns, const Block & source_block, const Block & aggregates)
{
if (analysis_result.before_where)
{
candidate.before_where = analysis_result.before_where->clone();
required_columns = candidate.before_where->foldActionsByProjection(
required_columns,
projection.sample_block_for_keys,
query_ptr->as<const ASTSelectQuery &>().where()->getColumnName());
if (required_columns.empty())
return false;
candidate.before_where->addAggregatesViaProjection(aggregates);
}
if (analysis_result.prewhere_info)
{
auto & prewhere_info = analysis_result.prewhere_info;
candidate.prewhere_info = std::make_shared<PrewhereInfo>();
candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
candidate.prewhere_info->need_filter = prewhere_info->need_filter;
auto actions_settings = ExpressionActionsSettings::fromSettings(query_context->getSettingsRef());
auto prewhere_actions = prewhere_info->prewhere_actions->clone();
NameSet prewhere_required_columns;
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_actions, actions_settings);
if (prewhere_info->row_level_filter_actions)
{
auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone();
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->row_level_filter
= std::make_shared<ExpressionActions>(row_level_filter_actions, actions_settings);
}
if (prewhere_info->alias_actions)
{
auto alias_actions = prewhere_info->alias_actions->clone();
prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(alias_actions, actions_settings);
}
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
}
bool match = true;
for (const auto & column : required_columns)
{
/// There are still missing columns, fail to match
if (!source_block.has(column))
{
match = false;
break;
}
}
return match;
};
for (const auto & projection : metadata_snapshot->projections)
{
ProjectionCandidate candidate{};
candidate.desc = &projection;
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
{
bool match = true;
Block aggregates;
// Let's first check if all aggregates are provided by current projection
for (const auto & aggregate : select.getQueryAnalyzer()->aggregates())
{
const auto * column = projection.sample_block.findByName(aggregate.column_name);
if (column)
{
aggregates.insert(*column);
}
else
{
match = false;
break;
}
}
if (!match)
continue;
// Check if all aggregation keys can be either provided by some action, or by current
// projection directly. Reshape the `before_aggregation` action DAG so that it only
// needs to provide aggregation keys, and certain children DAG might be substituted by
// some keys in projection.
candidate.before_aggregation = analysis_result.before_aggregation->clone();
auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys);
if (required_columns.empty())
continue;
// Reorder aggregation keys and attach aggregates
candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map);
candidate.before_aggregation->addAggregatesViaProjection(aggregates);
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block_for_keys, aggregates))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
for (const auto & aggregate : aggregates)
candidate.required_columns.push_back(aggregate.name);
candidates.push_back(std::move(candidate));
}
}
if (projection.type == ProjectionDescription::Type::Normal && (analysis_result.hasWhere() || analysis_result.hasPrewhere()))
{
NameSet required_columns;
if (analysis_result.hasWhere())
{
for (const auto & column : analysis_result.before_where->getResultColumns())
required_columns.insert(column.name);
}
else
{
for (const auto & column : analysis_result.prewhere_info->prewhere_actions->getResultColumns())
required_columns.insert(column.name);
}
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block, {}))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
candidates.push_back(std::move(candidate));
}
}
}
// Let's select the best aggregate projection to execute the query.
if (!candidates.empty())
{
size_t min_key_size = std::numeric_limits<size_t>::max();
ProjectionCandidate * selected_candidate = nullptr;
/// Favor aggregate projections
for (auto & candidate : candidates)
{
// TODO We choose the projection with least key_size. Perhaps we can do better? (key rollups)
if (candidate.desc->type == ProjectionDescription::Type::Aggregate && candidate.desc->key_size < min_key_size)
{
selected_candidate = &candidate;
min_key_size = candidate.desc->key_size;
}
}
/// TODO Select the best normal projection if no aggregate projection is available
if (!selected_candidate)
{
for (auto & candidate : candidates)
selected_candidate = &candidate;
}
if (!selected_candidate)
return false;
if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate)
{
selected_candidate->aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
query_info.projection = std::move(*selected_candidate);
return true;
}
return false;
}
}

View File

@ -1,13 +0,0 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info);
}

View File

@ -0,0 +1,75 @@
#pragma once
#include <Core/Defines.h>
#include <Processors/QueryPipeline.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
/// A Storage that allows reading from a single MergeTree data part.
class StorageFromBasePartsOfProjection final : public ext::shared_ptr_helper<StorageFromBasePartsOfProjection>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageFromBasePartsOfProjection>;
public:
String getName() const override { return "FromBasePartsOfProjection"; }
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams) override
{
// NOTE: It's used to read normal parts only
QueryPlan query_plan = std::move(*MergeTreeDataSelectExecutor(storage).readFromParts(
{},
column_names,
metadata_snapshot,
metadata_snapshot,
query_info,
context,
max_block_size,
num_streams,
nullptr,
query_info.projection ? query_info.projection->merge_tree_data_select_base_cache.get()
: query_info.merge_tree_data_select_cache.get()));
return query_plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
}
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override
{
return storage.mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
NamesAndTypesList getVirtuals() const override { return storage.getVirtuals(); }
protected:
StorageFromBasePartsOfProjection(const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot)
: IStorage(storage_.getStorageID()), storage(storage_)
{
setInMemoryMetadata(*metadata_snapshot);
}
private:
const MergeTreeData & storage;
};
}

View File

@ -31,19 +31,23 @@ public:
size_t max_block_size,
unsigned num_streams) override
{
// NOTE: It's used to read normal parts only
QueryPlan query_plan = std::move(*MergeTreeDataSelectExecutor(parts.front()->storage)
.readFromParts(
parts,
column_names,
metadata_snapshot,
metadata_snapshot,
query_info,
context,
max_block_size,
num_streams,
nullptr,
&num_granules_from_last_read));
query_info.projection ? query_info.projection->merge_tree_data_select_base_cache.get()
: query_info.merge_tree_data_select_cache.get()));
return query_plan.convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
return query_plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
}
@ -70,8 +74,6 @@ public:
return parts.front()->storage.getPartitionIDFromQuery(ast, context);
}
size_t getNumGranulesFromLastRead() const { return num_granules_from_last_read; }
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(getIDFromPart(part_))
@ -90,8 +92,6 @@ protected:
private:
MergeTreeData::DataPartsVector parts;
size_t num_granules_from_last_read = 0;
static StorageID getIDFromPart(const MergeTreeData::DataPartPtr & part_)
{
auto table_id = part_->storage.getStorageID();

View File

@ -116,6 +116,10 @@ struct InputOrderInfo
class IMergeTreeDataPart;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
struct MergeTreeDataSelectCache;
// The projection selected to execute current query
struct ProjectionCandidate
{
@ -126,6 +130,12 @@ struct ProjectionCandidate
Names required_columns;
NamesAndTypesList aggregation_keys;
AggregateDescriptions aggregate_descriptions;
bool complete = false;
ReadInOrderOptimizerPtr order_optimizer;
InputOrderInfoPtr input_order_info;
ManyExpressionActions group_by_elements_actions;
std::shared_ptr<MergeTreeDataSelectCache> merge_tree_data_select_base_cache;
std::shared_ptr<MergeTreeDataSelectCache> merge_tree_data_select_projection_cache;
};
/** Query along with some additional data,
@ -159,9 +169,12 @@ struct SelectQueryInfo
ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; }
Names required_columns;
/// If not null, it means we choose a projection to execute current query.
std::optional<ProjectionCandidate> projection;
bool ignore_projections = false;
std::shared_ptr<MergeTreeDataSelectCache> merge_tree_data_select_cache;
};
}