Use ActionsDAG to rewrite projection queries

This commit is contained in:
Amos Bird 2021-04-29 15:38:47 +08:00
parent bf95b684a7
commit 35961c0c5d
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
22 changed files with 398 additions and 327 deletions

View File

@ -439,6 +439,87 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs)
inputs.erase(it, inputs.end());
}
NameSet ActionsDAG::foldActionsByProjection(
const NameSet & required_columns, const Block & projection_block_for_keys, const String & predicate_column_name)
{
std::unordered_set<const Node *> visited_nodes;
std::unordered_set<std::string_view> visited_nodes_names;
std::stack<Node *> stack;
std::vector<const ColumnWithTypeAndName *> missing_input_from_projection_keys;
for (const auto & node : index)
{
if (required_columns.find(node->result_name) != required_columns.end() || node->result_name == predicate_column_name)
{
visited_nodes.insert(node);
visited_nodes_names.insert(node->result_name);
stack.push(const_cast<Node *>(node));
}
}
for (const auto & column : required_columns)
{
if (visited_nodes_names.find(column) == visited_nodes_names.end())
{
if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(column))
{
const auto * node = &addInput(*column_with_type_name);
index.push_back(node);
visited_nodes.insert(node);
}
else
{
// Missing column
return {};
}
}
}
while (!stack.empty())
{
auto * node = stack.top();
stack.pop();
if (const ColumnWithTypeAndName * column_with_type_name = projection_block_for_keys.findByName(node->result_name))
{
if (node->type != ActionsDAG::ActionType::INPUT)
{
/// Projection folding.
node->type = ActionsDAG::ActionType::INPUT;
node->result_type = std::move(column_with_type_name->type);
node->result_name = std::move(column_with_type_name->name);
node->children.clear();
inputs.push_back(node);
}
}
for (const auto * child : node->children)
{
if (visited_nodes.count(child) == 0)
{
stack.push(const_cast<Node *>(child));
visited_nodes.insert(child);
}
}
}
nodes.remove_if([&](const Node & node) { return visited_nodes.count(&node) == 0; });
std::erase_if(inputs, [&](const Node * node) { return visited_nodes.count(node) == 0; });
std::erase_if(index, [&](const Node * node) { return visited_nodes.count(node) == 0; });
NameSet next_required_columns;
for (const auto & input : inputs)
next_required_columns.insert(input->result_name);
return next_required_columns;
}
void ActionsDAG::addAggregatesViaProjection(const Block & aggregates)
{
for (const auto & aggregate : aggregates)
index.push_back(&addInput(aggregate));
}
void ActionsDAG::addAliases(const NamesWithAliases & aliases)
{
std::unordered_map<std::string_view, size_t> names_map;

View File

@ -168,6 +168,10 @@ public:
void removeUnusedActions(const Names & required_names);
void removeUnusedActions(const NameSet & required_names);
NameSet
foldActionsByProjection(const NameSet & keys, const Block & projection_block_for_keys, const String & predicate_column_name = {});
void addAggregatesViaProjection(const Block & aggregates);
bool hasArrayJoin() const;
bool hasStatefulFunctions() const;
bool trivial() const; /// If actions has no functions or array join.

View File

@ -590,7 +590,8 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
from_stage = storage->getQueryProcessingStage(context, options.to_stage, metadata_snapshot, query_info);
/// XXX Used for IN set index analysis. Is this a proper way?
metadata_snapshot->selected_projection = query_info.projection;
if (query_info.projection)
metadata_snapshot->selected_projection = query_info.projection->desc;
}
/// Do I need to perform the first part of the pipeline?
@ -969,6 +970,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
// To remove additional columns in dry run
// For example, sample column which can be removed in this stage
// TODO There seems to be no place initializing remove_columns_actions
if (expressions.prewhere_info->remove_columns_actions)
{
auto remove_columns = std::make_unique<ExpressionStep>(
@ -1409,19 +1411,23 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
{
Pipe pipe(std::make_shared<NullSource>(source_header));
if (!query_info.key_actions.func_map.empty())
if (query_info.projection)
{
ASTPtr expr = std::make_shared<ASTExpressionList>();
NamesAndTypesList columns;
for (const auto & [nt, func] : query_info.key_actions.func_map)
if (query_info.projection->before_where)
{
expr->children.push_back(func);
columns.push_back(nt);
auto expression = std::make_shared<ExpressionActions>(
query_info.projection->before_where, ExpressionActionsSettings::fromContext(context_));
pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
}
auto syntax_result = TreeRewriter(context_).analyze(expr, columns);
auto expression = ExpressionAnalyzer(expr, syntax_result, context_).getActions(false);
pipe.addSimpleTransform([&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
if (query_info.projection->before_aggregation)
{
auto expression = std::make_shared<ExpressionActions>(
query_info.projection->before_aggregation, ExpressionActionsSettings::fromContext(context_));
pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
}
}
if (query_info.prewhere_info)
@ -1864,7 +1870,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
backQuoteIfNeed(local_storage_id.getDatabaseName()),
local_storage_id.getFullTableName(),
required_columns,
query_info.projection ? query_info.projection->name : "");
query_info.projection ? query_info.projection->desc->name : "");
}
/// Create step which reads from empty source if storage has no data.
@ -1988,7 +1994,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
/// It is already added by storage (because of performance issues).
/// TODO: We should probably add another one processing stage for storage?
/// WithMergeableStateAfterAggregation is not ok because, e.g., it skips sorting after aggregation.
if (query_info.projection && query_info.projection->type == ProjectionDescription::Type::Aggregate)
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
return;
const auto & header_before_merge = query_plan.getCurrentDataStream().header;

View File

@ -19,7 +19,6 @@ ReadFromMergeTree::ReadFromMergeTree(
RangesInDataParts parts_,
IndexStatPtr index_stats_,
PrewhereInfoPtr prewhere_info_,
const ProjectionDescription * projection_,
Names virt_column_names_,
Settings settings_,
size_t num_streams_,
@ -36,7 +35,6 @@ ReadFromMergeTree::ReadFromMergeTree(
, parts(std::move(parts_))
, index_stats(std::move(index_stats_))
, prewhere_info(std::move(prewhere_info_))
, projection(projection_)
, virt_column_names(std::move(virt_column_names_))
, settings(std::move(settings_))
, num_streams(num_streams_)
@ -79,7 +77,7 @@ Pipe ReadFromMergeTree::readFromPool()
i, pool, settings.min_marks_for_concurrent_read, settings.max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
storage, metadata_snapshot, settings.use_uncompressed_cache,
prewhere_info, projection, settings.reader_settings, virt_column_names);
prewhere_info, settings.reader_settings, virt_column_names);
if (i == 0)
{
@ -99,7 +97,7 @@ ProcessorPtr ReadFromMergeTree::createSource(const RangesInDataPart & part)
return std::make_shared<TSource>(
storage, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, required_columns, part.ranges, settings.use_uncompressed_cache,
prewhere_info, projection, true, settings.reader_settings, virt_column_names, part.part_index_in_query);
prewhere_info, true, settings.reader_settings, virt_column_names, part.part_index_in_query);
}
Pipe ReadFromMergeTree::readInOrder()

View File

@ -74,7 +74,6 @@ public:
RangesInDataParts parts_,
IndexStatPtr index_stats_,
PrewhereInfoPtr prewhere_info_,
const ProjectionDescription * projection_,
Names virt_column_names_,
Settings settings_,
size_t num_streams_,
@ -100,7 +99,6 @@ private:
RangesInDataParts parts;
IndexStatPtr index_stats;
PrewhereInfoPtr prewhere_info;
const ProjectionDescription * projection;
Names virt_column_names;
Settings settings;

View File

@ -105,7 +105,7 @@ void IStorage::read(
auto pipe = read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
if (pipe.empty())
{
auto header = (query_info.projection ? query_info.projection->metadata : metadata_snapshot)
auto header = (query_info.projection ? query_info.projection->desc->metadata : metadata_snapshot)
->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
}

View File

@ -26,7 +26,6 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * projection_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
@ -37,7 +36,6 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, prewhere_info(prewhere_info_)
, projection(projection_)
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
@ -66,13 +64,6 @@ Chunk MergeTreeBaseSelectProcessor::generate()
if (res.hasRows())
{
injectVirtualColumns(res, task.get(), partition_value_type, virt_column_names);
if (projection)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = -1;
info->is_overflows = false;
res.setChunkInfo(std::move(info));
}
return res;
}
}
@ -260,24 +251,6 @@ static void injectVirtualColumnsImpl(
inserter.insertStringColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_projections")
{
ColumnPtr column;
if (rows)
{
Array projections;
for (const auto & [name, _] : part->getProjectionParts())
projections.push_back(name);
column = DataTypeArray(std::make_shared<DataTypeString>())
.createColumnConst(rows, projections)
->convertToFullColumnIfConst();
}
else
column = DataTypeArray(std::make_shared<DataTypeString>()).createColumn();
inserter.insertArrayOfStringsColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_part_index")
{
ColumnPtr column;

View File

@ -24,7 +24,6 @@ public:
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * projection_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
@ -62,7 +61,6 @@ protected:
StorageMetadataPtr metadata_snapshot;
PrewhereInfoPtr prewhere_info;
const ProjectionDescription * projection;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;

View File

@ -33,6 +33,7 @@
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Storages/AlterCommands.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataUtils.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
@ -3800,191 +3801,6 @@ bool MergeTreeData::mayBenefitFromIndexForIn(
}
bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info)
{
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections)
return false;
const auto & query_ptr = query_info.query;
InterpreterSelectQuery select(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
const auto & analysis_result = select.getAnalysisResult();
bool can_use_aggregate_projection = true;
/// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage,
/// we cannot use aggregate projection.
if (analysis_result.join != nullptr || analysis_result.array_join != nullptr)
can_use_aggregate_projection = false;
auto query_block = select.getSampleBlock();
const auto & required_query = select.getQuery()->as<const ASTSelectQuery &>();
auto required_predicates = [&required_query]() -> ASTPtr
{
if (required_query.prewhere() && required_query.where())
return makeASTFunction("and", required_query.prewhere()->clone(), required_query.where()->clone());
else if (required_query.prewhere())
return required_query.prewhere()->clone();
else if (required_query.where())
return required_query.where()->clone();
else
return nullptr;
}();
/// Check if all needed columns can be provided by some aggregate projection. Here we also try
/// to find expression matches. For example, suppose an aggregate projection contains a column
/// named sum(x) and the given query also has an expression called sum(x), it's a match. This is
/// why we need to ignore all aliases during projection creation and the above query planning.
/// It's also worth noting that, sqrt(sum(x)) will also work because we can treat sum(x) as a
/// required column.
/// TODO we can use ActionsDAG here to make proper check
/// The ownership of ProjectionDescription is hold in metadata_snapshot which lives along with
/// InterpreterSelect, thus we can store the raw pointer here.
struct ProjectionCandidate
{
const ProjectionDescription * desc;
ProjectionKeyActions key_actions;
Names required_columns;
NameSet required_columns_in_predicate;
};
std::vector<ProjectionCandidate> candidates;
ParserFunction parse_function;
for (const auto & projection : metadata_snapshot->projections)
{
if (projection.type == ProjectionDescription::Type::Aggregate && (!analysis_result.need_aggregate || !can_use_aggregate_projection))
continue;
if (projection.type == ProjectionDescription::Type::Normal && !(analysis_result.hasWhere() || analysis_result.hasPrewhere()))
continue;
bool covered = true;
ASTs expr_names;
Strings maybe_dimension_column_exprs;
Block key_block = projection.metadata->primary_key.sample_block;
/// First check if all columns in current query are provided by current projection.
/// Collect missing columns and remove matching columns in key blocks so they aren't used twice.
for (const auto & column_with_type_name : query_block)
{
if (!projection.sample_block.has(column_with_type_name.name))
maybe_dimension_column_exprs.push_back(column_with_type_name.name);
else
{
if (key_block.has(column_with_type_name.name))
key_block.erase(column_with_type_name.name);
}
}
ProjectionKeyActions key_actions;
/// Check if the missing columns can be produced by key columns in projection.
for (const auto & expr_name : maybe_dimension_column_exprs)
{
/// XXX We need AST out from string names. Have to resort to the parser here.
Tokens tokens_number(expr_name.data(), expr_name.data() + expr_name.size());
IParser::Pos pos(tokens_number, settings.max_parser_depth);
Expected expected;
ASTPtr ast;
/// It should be a function call, or else we would match them already.
if (!parse_function.parse(pos, ast, expected))
{
covered = false;
break;
}
// It should be a function call that only requires unused key columns, or else some key
// column might be transformed in different ways, which is not injective.
if (!key_actions.add(ast, expr_name, key_block))
{
covered = false;
break;
}
}
if (covered)
{
Names required_column_names = query_block.getNames();
// Calculate the correct required projection columns
for (auto & name : required_column_names)
{
auto it = key_actions.name_map.find(name);
if (it != key_actions.name_map.end())
name = it->second;
}
ProjectionCondition projection_condition(projection.column_names, required_column_names);
if (required_predicates && !projection_condition.check(required_predicates))
continue;
candidates.push_back(
{&projection,
std::move(key_actions),
projection_condition.getRequiredColumns(),
projection_condition.getRequiredColumnsInPredicate()});
}
}
// Let's select the best aggregate projection to execute the query.
if (!candidates.empty())
{
size_t min_key_size = std::numeric_limits<size_t>::max();
const ProjectionCandidate * selected_candidate = nullptr;
/// Favor aggregate projections
for (const auto & candidate : candidates)
{
// TODO We choose the projection with least key_size. Perhaps we can do better? (key rollups)
if (candidate.desc->type == ProjectionDescription::Type::Aggregate && candidate.desc->key_size < min_key_size)
{
selected_candidate = &candidate;
min_key_size = candidate.desc->key_size;
}
}
/// Select the best normal projection if no aggregate projection is available
if (!selected_candidate)
{
size_t max_num_of_matched_key_columns = 0;
for (const auto & candidate : candidates)
{
NameSet column_names(
candidate.desc->metadata->sorting_key.column_names.begin(), candidate.desc->metadata->sorting_key.column_names.end());
size_t num_of_matched_key_columns = 0;
for (const auto & name : candidate.desc->metadata->sorting_key.column_names)
{
if (candidate.required_columns_in_predicate.find(name) != candidate.required_columns_in_predicate.end())
++num_of_matched_key_columns;
}
/// Select the normal projection that has the most matched key columns in predicate
/// TODO What's the best strategy here?
if (num_of_matched_key_columns > max_num_of_matched_key_columns)
{
selected_candidate = &candidate;
max_num_of_matched_key_columns = num_of_matched_key_columns;
}
}
}
if (!selected_candidate)
return false;
query_info.projection = selected_candidate->desc;
query_info.key_actions = std::move(selected_candidate->key_actions);
query_info.projection_names = std::move(selected_candidate->required_columns);
query_info.projection_block = query_block;
query_info.aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
query_info.aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
return true;
}
return false;
}
QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
ContextPtr query_context,
QueryProcessingStage::Enum to_stage,
@ -3995,7 +3811,7 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
{
if (getQueryProcessingStageWithAggregateProjection(query_context, metadata_snapshot, query_info))
{
if (query_info.projection->type == ProjectionDescription::Type::Aggregate)
if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
return QueryProcessingStage::Enum::WithMergeableState;
}
}

View File

@ -358,11 +358,6 @@ public:
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
static bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info);
QueryProcessingStage::Enum getQueryProcessingStage(
ContextPtr query_context,
QueryProcessingStage::Enum to_stage,

View File

@ -142,7 +142,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
const auto & settings = context->getSettingsRef();
if (query_info.projection == nullptr)
if (!query_info.projection)
{
if (settings.allow_experimental_projection_optimization && settings.force_optimize_projection
&& !metadata_snapshot->projections.empty())
@ -165,7 +165,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
size_t no_projection_granules = 0;
size_t with_projection_granules = 0;
if (query_info.projection->type == ProjectionDescription::Type::Normal)
if (query_info.projection->desc->type == ProjectionDescription::Type::Normal)
plan_no_projections = readFromParts(
data.getDataPartsVector(),
column_names_to_return,
@ -177,7 +177,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
max_block_numbers_to_read,
&no_projection_granules);
LOG_DEBUG(log, "Choose projection {}", query_info.projection->name);
LOG_DEBUG(log, "Choose projection {}", query_info.projection->desc->name);
Pipes pipes;
auto parts = data.getDataPartsVector();
@ -187,7 +187,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
for (auto & part : parts)
{
const auto & projections = part->getProjectionParts();
auto it = projections.find(query_info.projection->name);
auto it = projections.find(query_info.projection->desc->name);
if (it != projections.end())
{
projection_parts.push_back(it->second);
@ -214,8 +214,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
{
auto plan = readFromParts(
std::move(projection_parts),
query_info.projection_names, // raw columns without key transformation
query_info.projection->metadata,
query_info.projection->required_columns, // raw columns without key transformation
query_info.projection->desc->metadata,
query_info,
context,
max_block_size,
@ -230,57 +230,32 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
if (!projection_pipe.empty())
{
// If `key_actions` is not empty, transform input blocks by adding needed columns
// If `before_where` is not empty, transform input blocks by adding needed columns
// originated from key columns. We already project the block at the end, using
// projection_block, so we can just add more columns here without worrying
if (!query_info.key_actions.func_map.empty())
// NOTE: prewhere is executed inside readFromParts
if (query_info.projection->before_where)
{
ASTPtr expr = std::make_shared<ASTExpressionList>();
NamesAndTypesList columns;
for (const auto & [nt, func] : query_info.key_actions.func_map)
{
expr->children.push_back(func);
columns.push_back(nt);
}
auto syntax_result = TreeRewriter(context).analyze(expr, columns);
auto expression = ExpressionAnalyzer(expr, syntax_result, context).getActions(false);
auto expression = std::make_shared<ExpressionActions>(
query_info.projection->before_where, ExpressionActionsSettings::fromContext(context));
projection_pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
}
/// In sample block we use just key columns
if (given_select.where())
if (query_info.projection->before_aggregation)
{
// we can use the previous pipeline's sample block here
Block filter_block = projection_pipe.getHeader();
ASTPtr where = given_select.where()->clone();
ProjectionCondition projection_condition(filter_block.getNames(), {});
projection_condition.rewrite(where);
auto where_column_name = where->getColumnName();
auto syntax_result = TreeRewriter(context).analyze(where, filter_block.getNamesAndTypesList());
const auto actions = ExpressionAnalyzer(where, syntax_result, context).getActions(false);
projection_pipe.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType)
{
return std::make_shared<FilterTransform>(header, actions, where_column_name, true);
});
auto expression = std::make_shared<ExpressionActions>(
query_info.projection->before_aggregation, ExpressionActionsSettings::fromContext(context));
projection_pipe.addSimpleTransform(
[&expression](const Block & header) { return std::make_shared<ExpressionTransform>(header, expression); });
}
// Project columns and set bucket number to -1
// optionally holds the reference of parent parts
projection_pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ProjectionPartTransform>(header, query_info.projection_block, std::move(parent_parts));
});
}
}
if (!normal_parts.empty())
{
auto storage_from_source_part = StorageFromMergeTreeDataPart::create(std::move(normal_parts));
auto ast = query_info.projection->query_ast->clone();
auto ast = query_info.projection->desc->query_ast->clone();
auto & select = ast->as<ASTSelectQuery &>();
if (given_select.where())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.where()->clone());
@ -293,21 +268,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline);
with_projection_granules += storage_from_source_part->getNumGranulesFromLastRead();
if (!ordinary_pipe.empty() && processed_stage < QueryProcessingStage::Enum::WithMergeableState)
{
// projection and set bucket number to -1
ordinary_pipe.addSimpleTransform(
[&](const Block & header) { return std::make_shared<ProjectionPartTransform>(header, query_info.projection_block); });
}
}
/// Use normal projection only if we read less granules then without it.
/// TODO: check if read-in-order optimization possible for normal projection.
if (query_info.projection->type == ProjectionDescription::Type::Normal && with_projection_granules > no_projection_granules)
if (query_info.projection->desc->type == ProjectionDescription::Type::Normal && with_projection_granules > no_projection_granules)
return plan_no_projections;
if (query_info.projection->type == ProjectionDescription::Type::Aggregate)
if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
/// Here we create shared ManyAggregatedData for both projection and ordinary data.
/// For ordinary data, AggregatedData is filled in a usual way.
@ -332,12 +300,12 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
std::cerr << "============ header before merge" << std::endl;
std::cerr << header_before_merge.dumpStructure() << std::endl;
ColumnNumbers keys;
for (const auto & key : query_info.aggregation_keys)
for (const auto & key : query_info.projection->aggregation_keys)
keys.push_back(header_before_merge.getPositionByName(key.name));
/// Aggregates are needed to calc proper header for AggregatingTransform result.
/// However, they are not filled cause header from projection pipe already contain aggregate functions.
AggregateDescriptions aggregates = query_info.aggregate_descriptions;
AggregateDescriptions aggregates = query_info.projection->aggregate_descriptions;
/// Aggregator::Params params(header_before_merge, keys, query_info.aggregate_descriptions, overflow_row, settings.max_threads);
Aggregator::Params params(
@ -390,10 +358,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
std::cerr << header_before_aggregation.dumpStructure() << std::endl;
ColumnNumbers keys;
for (const auto & key : query_info.aggregation_keys)
for (const auto & key : query_info.projection->aggregation_keys)
keys.push_back(header_before_aggregation.getPositionByName(key.name));
AggregateDescriptions aggregates = query_info.aggregate_descriptions;
AggregateDescriptions aggregates = query_info.projection->aggregate_descriptions;
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
@ -458,7 +426,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
bool use_projection_metadata) const
{
const StorageMetadataPtr & metadata_snapshot
= (query_info.projection && use_projection_metadata) ? query_info.projection->metadata : metadata_snapshot_base;
= (query_info.projection && use_projection_metadata) ? query_info.projection->desc->metadata : metadata_snapshot_base;
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
/// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
@ -1269,6 +1237,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
plan->addStep(std::move(adding_column));
}
// TODO There seems to be no place initializing remove_columns_actions
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
{
auto expression_step = std::make_unique<ExpressionStep>(
@ -1411,8 +1380,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
column_names,
std::move(parts),
std::move(index_stats),
query_info.prewhere_info,
query_info.projection,
query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info,
virt_columns,
step_settings,
num_streams,
@ -1621,8 +1589,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
column_names,
std::move(new_parts),
std::move(index_stats),
query_info.prewhere_info,
query_info.projection,
query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info,
virt_columns,
step_settings,
num_streams,
@ -1805,8 +1772,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
column_names,
std::move(new_parts),
std::move(index_stats),
query_info.prewhere_info,
query_info.projection,
query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info,
virt_columns,
step_settings,
num_streams,
@ -1897,8 +1863,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
column_names,
std::move(lonely_parts),
std::move(index_stats),
query_info.prewhere_info,
query_info.projection,
query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info,
virt_columns,
step_settings,
num_streams_for_lonely_parts,

View File

@ -0,0 +1,222 @@
#include <Storages/MergeTree/MergeTreeDataUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
namespace DB
{
bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info)
{
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections)
return false;
const auto & query_ptr = query_info.query;
InterpreterSelectQuery select(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
const auto & analysis_result = select.getAnalysisResult();
bool can_use_aggregate_projection = true;
/// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage,
/// we cannot use aggregate projection.
if (analysis_result.join != nullptr || analysis_result.array_join != nullptr)
can_use_aggregate_projection = false;
auto query_block = select.getSampleBlock();
const auto & required_query = select.getQuery()->as<const ASTSelectQuery &>();
auto required_predicates = [&required_query]() -> ASTPtr
{
if (required_query.prewhere() && required_query.where())
return makeASTFunction("and", required_query.prewhere()->clone(), required_query.where()->clone());
else if (required_query.prewhere())
return required_query.prewhere()->clone();
else if (required_query.where())
return required_query.where()->clone();
else
return nullptr;
}();
/// Check if all needed columns can be provided by some aggregate projection. Here we also try
/// to find expression matches. For example, suppose an aggregate projection contains a column
/// named sum(x) and the given query also has an expression called sum(x), it's a match. This is
/// why we need to ignore all aliases during projection creation and the above query planning.
/// It's also worth noting that, sqrt(sum(x)) will also work because we can treat sum(x) as a
/// required column.
/// The ownership of ProjectionDescription is hold in metadata_snapshot which lives along with
/// InterpreterSelect, thus we can store the raw pointer here.
std::vector<ProjectionCandidate> candidates;
NameSet keys;
for (const auto & desc : select.getQueryAnalyzer()->aggregationKeys())
keys.insert(desc.name);
// All required columns should be provided by either current projection or previous actions
// Let's traverse backward to finish the check.
// TODO what if there is a column with name sum(x) and an aggregate sum(x)?
auto rewrite_before_where =
[&](ProjectionCandidate & candidate, const ProjectionDescription & projection, NameSet & required_columns, const Block & aggregates)
{
if (analysis_result.before_where)
{
candidate.before_where = analysis_result.before_where->clone();
required_columns = candidate.before_where->foldActionsByProjection(
required_columns, projection.sample_block_for_keys, query_ptr->as<const ASTSelectQuery &>().where()->getColumnName());
if (required_columns.empty())
return false;
candidate.before_where->addAggregatesViaProjection(aggregates);
}
if (analysis_result.prewhere_info)
{
auto & prewhere_info = analysis_result.prewhere_info;
candidate.prewhere_info = std::make_shared<PrewhereInfo>();
candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
candidate.prewhere_info->need_filter = prewhere_info->need_filter;
auto actions_settings = ExpressionActionsSettings::fromSettings(query_context->getSettingsRef());
auto prewhere_actions = prewhere_info->prewhere_actions->clone();
NameSet prewhere_required_columns;
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_actions, actions_settings);
if (prewhere_info->row_level_filter_actions)
{
auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone();
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->row_level_filter
= std::make_shared<ExpressionActions>(row_level_filter_actions, actions_settings);
}
if (prewhere_info->alias_actions)
{
auto alias_actions = prewhere_info->alias_actions->clone();
prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(alias_actions, actions_settings);
}
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
}
return true;
};
for (const auto & projection : metadata_snapshot->projections)
{
ProjectionCandidate candidate{};
candidate.desc = &projection;
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
{
bool match = true;
Block aggregates;
// Let's first check if all aggregates are provided by current projection
for (const auto & aggregate : select.getQueryAnalyzer()->aggregates())
{
const auto * column = projection.sample_block.findByName(aggregate.column_name);
if (column)
{
aggregates.insert(*column);
}
else
{
match = false;
break;
}
}
if (!match)
continue;
// Check if all aggregation keys can be either provided by some action, or by current
// projection directly. Reshape the `before_aggregation` action DAG so that it only
// needs to provide aggregation keys, and certain children DAG might be substituted by
// some keys in projection.
candidate.before_aggregation = analysis_result.before_aggregation->clone();
auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys);
if (required_columns.empty())
continue;
// Attach aggregates
candidate.before_aggregation->addAggregatesViaProjection(aggregates);
if (rewrite_before_where(candidate, projection, required_columns, aggregates))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
for (const auto & aggregate : aggregates)
candidate.required_columns.push_back(aggregate.name);
candidates.push_back(std::move(candidate));
}
}
if (projection.type == ProjectionDescription::Type::Normal && (analysis_result.hasWhere() || analysis_result.hasPrewhere()))
{
NameSet required_columns;
if (analysis_result.hasWhere())
{
for (const auto & column : analysis_result.before_where->getResultColumns())
required_columns.insert(column.name);
}
else
{
for (const auto & column : analysis_result.prewhere_info->prewhere_actions->getResultColumns())
required_columns.insert(column.name);
}
if (rewrite_before_where(candidate, projection, required_columns, {}))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
candidates.push_back(std::move(candidate));
}
}
}
// Let's select the best aggregate projection to execute the query.
if (!candidates.empty())
{
size_t min_key_size = std::numeric_limits<size_t>::max();
ProjectionCandidate * selected_candidate = nullptr;
/// Favor aggregate projections
for (auto & candidate : candidates)
{
// TODO We choose the projection with least key_size. Perhaps we can do better? (key rollups)
if (candidate.desc->type == ProjectionDescription::Type::Aggregate && candidate.desc->key_size < min_key_size)
{
selected_candidate = &candidate;
min_key_size = candidate.desc->key_size;
}
}
/// TODO Select the best normal projection if no aggregate projection is available
if (!selected_candidate)
{
for (auto & candidate : candidates)
selected_candidate = &candidate;
}
if (!selected_candidate)
return false;
if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate)
{
selected_candidate->aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
query_info.projection = std::move(*selected_candidate);
return true;
}
return false;
}
}

View File

@ -0,0 +1,13 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info);
}

View File

@ -23,7 +23,6 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * projection_,
bool check_columns,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -32,7 +31,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, projection_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},

View File

@ -27,7 +27,6 @@ public:
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const ProjectionDescription * projection_,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -23,7 +23,6 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
MarkRanges mark_ranges_,
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * projection_,
bool check_columns_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_,
@ -32,7 +31,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
:
MergeTreeBaseSelectProcessor{
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
storage_, metadata_snapshot_, prewhere_info_, projection_, max_block_size_rows_,
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},

View File

@ -27,7 +27,6 @@ public:
MarkRanges mark_ranges,
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const ProjectionDescription * projection_,
bool check_columns,
const MergeTreeReaderSettings & reader_settings,
const Names & virt_column_names = {},

View File

@ -19,13 +19,11 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * projection_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_)
:
MergeTreeBaseSelectProcessor{
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, projection_,
max_block_size_rows_,
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
thread{thread_},

View File

@ -25,7 +25,6 @@ public:
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const ProjectionDescription * projection_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_);

View File

@ -69,6 +69,7 @@ ProjectionDescription ProjectionDescription::clone() const
other.column_names = column_names;
other.data_types = data_types;
other.sample_block = sample_block;
other.sample_block_for_keys = sample_block_for_keys;
other.metadata = metadata;
other.key_size = key_size;
@ -135,6 +136,13 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const
result.required_columns = select.getRequiredColumns();
result.sample_block = select.getSampleBlock();
const auto & analysis_result = select.getAnalysisResult();
if (analysis_result.need_aggregate)
{
for (const auto & key : select.getQueryAnalyzer()->aggregationKeys())
result.sample_block_for_keys.insert({nullptr, key.type, key.name});
}
for (size_t i = 0; i < result.sample_block.columns(); ++i)
{
const auto & column_with_type_name = result.sample_block.getByPosition(i);

View File

@ -56,6 +56,8 @@ struct ProjectionDescription
/// Sample block with projection columns. (NOTE: columns in block are empty, but not nullptr)
Block sample_block;
Block sample_block_for_keys;
StorageMetadataPtr metadata;
size_t key_size = 0;

View File

@ -116,6 +116,18 @@ struct InputOrderInfo
class IMergeTreeDataPart;
// The projection selected to execute current query
struct ProjectionCandidate
{
const ProjectionDescription * desc;
PrewhereInfoPtr prewhere_info;
ActionsDAGPtr before_where;
ActionsDAGPtr before_aggregation;
Names required_columns;
NamesAndTypesList aggregation_keys;
AggregateDescriptions aggregate_descriptions;
};
/** Query along with some additional data,
* that can be used during query processing
* inside storage engines.
@ -148,21 +160,8 @@ struct SelectQueryInfo
ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; }
/// If not null, it means we choose a projection to execute current query.
const ProjectionDescription * projection{};
ProjectionKeyActions key_actions;
Names projection_names;
Block projection_block;
NamesAndTypesList aggregation_keys;
AggregateDescriptions aggregate_descriptions;
std::optional<ProjectionCandidate> projection;
bool ignore_projections = false;
/// TODO Store to-be-scanned data parts if some aggregate projection is used
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
DataPartsVector projection_parts;
DataPartsVector parent_parts;
DataPartsVector normal_parts;
};
}