Disable normal projection by the number of granules.

This commit is contained in:
Nikolai Kochetov 2021-04-23 17:06:36 +03:00 committed by Amos Bird
parent 95431168c2
commit 672cfedd13
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
4 changed files with 54 additions and 7 deletions

View File

@ -3817,7 +3817,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
const auto & analysis_result = select.getAnalysisResult();
/// If first staging query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage, return false
if (analysis_result.join != nullptr || analysis_result.array_join != nullptr || analysis_result.before_aggregation == nullptr)
if (analysis_result.join != nullptr || analysis_result.array_join != nullptr)
return false;
auto query_block = select.getSampleBlock();
@ -3837,6 +3837,12 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
ParserFunction parse_function;
for (const auto & projection : metadata_snapshot->projections)
{
if (projection.type == "aggregate" && !analysis_result.need_aggregate)
continue;
if (projection.type == "normal" && !(analysis_result.hasWhere() || analysis_result.hasPrewhere()))
continue;
bool covered = true;
ASTs expr_names;
Strings maybe_dimension_column_exprs;
@ -3936,7 +3942,10 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
{
if (getQueryProcessingStageWithAggregateProjection(query_context, metadata_snapshot, query_info))
return QueryProcessingStage::Enum::WithMergeableState;
{
if (query_info.aggregate_projection->type == "aggregate")
return QueryProcessingStage::Enum::WithMergeableState;
}
}
return QueryProcessingStage::Enum::FetchColumns;

View File

@ -159,6 +159,24 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
max_block_numbers_to_read);
}
/// For normal projection, read anyway.
/// We will chose those which read less granules.
QueryPlanPtr plan_no_projections;
size_t no_projection_granules = 0;
size_t with_projection_granules = 0;
if (query_info.aggregate_projection->type == "normal")
plan_no_projections = readFromParts(
data.getDataPartsVector(),
column_names_to_return,
metadata_snapshot,
query_info,
context,
max_block_size,
num_streams,
max_block_numbers_to_read,
&no_projection_granules);
LOG_DEBUG(log, "Choose projection {}", query_info.aggregate_projection->name);
Pipes pipes;
@ -235,7 +253,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
context,
max_block_size,
num_streams,
max_block_numbers_to_read);
max_block_numbers_to_read,
&with_projection_granules,
true);
if (plan)
projection_pipe = plan->convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
@ -303,6 +323,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto interpreter = InterpreterSelectQuery(ast, context, storage_from_source_part, nullptr, SelectQueryOptions{processed_stage}.ignoreAggregation());
ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline);
with_projection_granules += storage_from_source_part->getNumGranulesFromLastRead();
if (!ordinary_pipe.empty() && processed_stage < QueryProcessingStage::Enum::WithMergeableState)
{
// projection and set bucket number to -1
@ -313,6 +335,11 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
}
}
/// Use normal projection only if we read less granules then without it.
/// TODO: check if read-in-order optimization possible for normal projection.
if (query_info.aggregate_projection->type == "normal" && with_projection_granules > no_projection_granules)
return plan_no_projections;
if (query_info.aggregate_projection->type == "aggregate")
{
/// Here we create shared ManyAggregatedData for both projection and ordinary data.
@ -450,10 +477,12 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
ContextPtr context,
const UInt64 max_block_size,
const unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
const PartitionIdToMaxBlock * max_block_numbers_to_read,
size_t * num_granules_are_to_read,
bool use_projection_metadata) const
{
const StorageMetadataPtr & metadata_snapshot
= query_info.aggregate_projection ? query_info.aggregate_projection->metadata : metadata_snapshot_base;
= (query_info.aggregate_projection && use_projection_metadata) ? query_info.aggregate_projection->metadata : metadata_snapshot_base;
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
/// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
@ -1111,6 +1140,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
total_marks_pk.load(std::memory_order_relaxed),
sum_marks, sum_ranges);
if (num_granules_are_to_read)
*num_granules_are_to_read = sum_marks_pk.load(std::memory_order_relaxed);
if (parts_with_ranges.empty())
return std::make_unique<QueryPlan>();

View File

@ -45,7 +45,9 @@ public:
ContextPtr context,
UInt64 max_block_size,
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr,
size_t * num_granules_are_to_read = nullptr,
bool use_projection_metadata = false) const;
private:
const MergeTreeData & data;

View File

@ -33,7 +33,7 @@ public:
{
QueryPlan query_plan =
std::move(*MergeTreeDataSelectExecutor(parts.front()->storage)
.readFromParts(parts, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams));
.readFromParts(parts, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams, nullptr, &num_granules_from_last_read));
return query_plan.convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
}
@ -62,6 +62,8 @@ public:
return parts.front()->storage.getPartitionIDFromQuery(ast, context);
}
size_t getNumGranulesFromLastRead() const { return num_granules_from_last_read; }
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(getIDFromPart(part_))
@ -80,6 +82,8 @@ protected:
private:
MergeTreeData::DataPartsVector parts;
size_t num_granules_from_last_read = 0;
static StorageID getIDFromPart(const MergeTreeData::DataPartPtr & part_)
{
auto table_id = part_->storage.getStorageID();