Fix two cases of projection analysis.

1. Fix projection analysis with partitions.
2. Apply alter conversions during projection analysis.
This commit is contained in:
Amos Bird 2023-11-09 17:59:35 +08:00
parent 0d2277b6c1
commit 16578d97a3
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
17 changed files with 169 additions and 111 deletions

View File

@ -411,7 +411,6 @@ struct MinMaxProjectionCandidate
{
AggregateProjectionCandidate candidate;
Block block;
MergeTreeData::DataPartsVector normal_parts;
};
struct AggregateProjectionCandidates
@ -477,7 +476,6 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
{
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection analyzed DAG {}", proj_dag->dumpDAG());
AggregateProjectionCandidate candidate{.info = std::move(info), .dag = std::move(proj_dag)};
MergeTreeData::DataPartsVector minmax_projection_normal_parts;
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection sample block {}", sample_block.dumpStructure());
auto block = reading.getMergeTreeData().getMinMaxCountProjectionBlock(
@ -486,13 +484,12 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
dag.filter_node != nullptr,
query_info,
parts,
minmax_projection_normal_parts,
max_added_blocks.get(),
context);
// LOG_TRACE(&Poco::Logger::get("optimizeUseProjections"), "Projection sample block 2 {}", block.dumpStructure());
// minmax_count_projection cannot be used used when there is no data to process, because
// minmax_count_projection cannot be used when there is no data to process, because
// it will produce incorrect result during constant aggregation.
// See https://github.com/ClickHouse/ClickHouse/issues/36728
if (block)
@ -500,7 +497,6 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
MinMaxProjectionCandidate minmax;
minmax.candidate = std::move(candidate);
minmax.block = std::move(block);
minmax.normal_parts = std::move(minmax_projection_normal_parts);
minmax.candidate.projection = projection;
candidates.minmax_projection.emplace(std::move(minmax));
}
@ -509,6 +505,18 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
if (!candidates.minmax_projection)
{
auto it = std::find_if(agg_projections.begin(), agg_projections.end(), [&](const auto * projection)
{
return projection->name == context->getSettings().preferred_optimize_projection_name.value;
});
if (it != agg_projections.end())
{
const ProjectionDescription * preferred_projection = *it;
agg_projections.clear();
agg_projections.push_back(preferred_projection);
}
candidates.real.reserve(agg_projections.size());
for (const auto * projection : agg_projections)
{
@ -570,57 +578,66 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks, allow_implicit_projections);
AggregateProjectionCandidate * best_candidate = nullptr;
if (candidates.minmax_projection)
best_candidate = &candidates.minmax_projection->candidate;
else if (candidates.real.empty())
return false;
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
const auto metadata = reading->getStorageMetadata();
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, /* alter_conversions = */ {});
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
const auto & proj_name_from_settings = context->getSettings().preferred_optimize_projection_name.value;
bool found_best_candidate = false;
/// Selecting best candidate.
for (auto & candidate : candidates.real)
AggregateProjectionCandidate * best_candidate = nullptr;
if (candidates.minmax_projection)
{
auto required_column_names = candidate.dag->getRequiredColumnsNames();
ActionDAGNodes added_filter_nodes;
if (candidates.has_filter)
added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front());
best_candidate = &candidates.minmax_projection->candidate;
}
else if (!candidates.real.empty())
{
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges();
bool analyzed = analyzeProjectionCandidate(
candidate, *reading, reader, required_column_names, parts,
metadata, query_info, context, max_added_blocks, added_filter_nodes);
if (!analyzed)
continue;
if (candidate.sum_marks > ordinary_reading_marks)
continue;
if ((best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks) && !found_best_candidate)
best_candidate = &candidate;
if (!proj_name_from_settings.empty() && candidate.projection->name == proj_name_from_settings)
/// Selecting best candidate.
for (auto & candidate : candidates.real)
{
best_candidate = &candidate;
found_best_candidate = true;
auto required_column_names = candidate.dag->getRequiredColumnsNames();
ActionDAGNodes added_filter_nodes;
if (candidates.has_filter)
added_filter_nodes.nodes.push_back(candidate.dag->getOutputs().front());
bool analyzed = analyzeProjectionCandidate(
candidate,
*reading,
reader,
required_column_names,
parts_with_ranges,
metadata,
query_info,
context,
max_added_blocks,
added_filter_nodes);
if (!analyzed)
continue;
if (candidate.sum_marks > ordinary_reading_marks)
continue;
if (best_candidate == nullptr || best_candidate->sum_marks > candidate.sum_marks)
best_candidate = &candidate;
}
if (!best_candidate)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}
}
if (!best_candidate)
else
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
}
chassert(best_candidate != nullptr);
QueryPlanStepPtr projection_reading;
bool has_ordinary_parts;
@ -641,9 +658,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = candidates.minmax_projection->candidate.projection->name,
});
has_ordinary_parts = !candidates.minmax_projection->normal_parts.empty();
if (has_ordinary_parts)
reading->resetParts(std::move(candidates.minmax_projection->normal_parts));
has_ordinary_parts = false;
}
else
{

View File

@ -10,7 +10,7 @@
#include <Storages/ProjectionsDescription.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <stack>
#include <algorithm>
namespace DB::QueryPlanOptimizations
{
@ -109,6 +109,19 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (normal_projections.empty())
return false;
ContextPtr context = reading->getContext();
auto it = std::find_if(normal_projections.begin(), normal_projections.end(), [&](const auto * projection)
{
return projection->name == context->getSettings().preferred_optimize_projection_name.value;
});
if (it != normal_projections.end())
{
const ProjectionDescription * preferred_projection = *it;
normal_projections.clear();
normal_projections.push_back(preferred_projection);
}
QueryDAG query;
{
auto & child = iter->node->children[iter->next_child - 1];
@ -124,30 +137,16 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
const Names & required_columns = reading->getRealColumnNames();
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
ContextPtr context = reading->getContext();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, /* alter_conversions = */ {});
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
size_t ordinary_reading_marks = ordinary_reading_select_result->marks();
const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges();
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
// Here we iterate over the projections and check if we have the same projections as we specified in preferred_projection_name
bool is_projection_found = false;
const auto & proj_name_from_settings = context->getSettings().preferred_optimize_projection_name.value;
if (!proj_name_from_settings.empty())
{
for (const auto * projection : normal_projections)
{
if (projection->name == proj_name_from_settings)
{
is_projection_found = true;
break;
}
}
}
for (const auto * projection : normal_projections)
{
if (!hasAllRequiredColumns(projection, required_columns))
@ -161,8 +160,16 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
added_filter_nodes.nodes.push_back(query.filter_node);
bool analyzed = analyzeProjectionCandidate(
candidate, *reading, reader, required_columns, parts,
metadata, query_info, context, max_added_blocks, added_filter_nodes);
candidate,
*reading,
reader,
required_columns,
parts_with_ranges,
metadata,
query_info,
context,
max_added_blocks,
added_filter_nodes);
if (!analyzed)
continue;
@ -170,9 +177,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (candidate.sum_marks >= ordinary_reading_marks)
continue;
if (!is_projection_found && (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks))
best_candidate = &candidate;
else if (is_projection_found && projection->name == proj_name_from_settings)
if (best_candidate == nullptr || candidate.sum_marks < best_candidate->sum_marks)
best_candidate = &candidate;
}

View File

@ -210,7 +210,7 @@ bool analyzeProjectionCandidate(
const ReadFromMergeTree & reading,
const MergeTreeDataSelectExecutor & reader,
const Names & required_column_names,
const MergeTreeData::DataPartsVector & parts,
const RangesInDataParts & parts_with_ranges,
const StorageMetadataPtr & metadata,
const SelectQueryInfo & query_info,
const ContextPtr & context,
@ -219,14 +219,20 @@ bool analyzeProjectionCandidate(
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
for (const auto & part : parts)
std::vector<AlterConversionsPtr> alter_conversions;
for (const auto & part_with_ranges : parts_with_ranges)
{
const auto & created_projections = part->getProjectionParts();
const auto & created_projections = part_with_ranges.data_part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end())
{
projection_parts.push_back(it->second);
}
else
normal_parts.push_back(part);
{
normal_parts.push_back(part_with_ranges.data_part);
alter_conversions.push_back(part_with_ranges.alter_conversions);
}
}
if (projection_parts.empty())
@ -252,7 +258,8 @@ bool analyzeProjectionCandidate(
if (!normal_parts.empty())
{
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), /* alter_conversions = */ {});
/// TODO: We can reuse existing analysis_result by filtering out projection parts
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions));
if (normal_result_ptr->error())
return false;

View File

@ -19,6 +19,7 @@ using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelect
class IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
struct RangesInDataParts;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
@ -71,7 +72,7 @@ bool analyzeProjectionCandidate(
const ReadFromMergeTree & reading,
const MergeTreeDataSelectExecutor & reader,
const Names & required_column_names,
const DataPartsVector & parts,
const RangesInDataParts & parts_with_ranges,
const StorageMetadataPtr & metadata,
const SelectQueryInfo & query_info,
const ContextPtr & context,

View File

@ -2258,10 +2258,7 @@ size_t MergeTreeDataSelectAnalysisResult::marks() const
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
const auto & index_stats = std::get<ReadFromMergeTree::AnalysisResult>(result).index_stats;
if (index_stats.empty())
return 0;
return index_stats.back().num_granules_after;
return std::get<ReadFromMergeTree::AnalysisResult>(result).selected_marks;
}
UInt64 MergeTreeDataSelectAnalysisResult::rows() const
@ -2269,9 +2266,15 @@ UInt64 MergeTreeDataSelectAnalysisResult::rows() const
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
const auto & index_stats = std::get<ReadFromMergeTree::AnalysisResult>(result).index_stats;
if (index_stats.empty())
return 0;
return std::get<ReadFromMergeTree::AnalysisResult>(result).selected_rows;
}
const RangesInDataParts & MergeTreeDataSelectAnalysisResult::partsWithRanges() const
{
if (std::holds_alternative<std::exception_ptr>(result))
std::rethrow_exception(std::get<std::exception_ptr>(result));
return std::get<ReadFromMergeTree::AnalysisResult>(result).parts_with_ranges;
}
}

View File

@ -197,13 +197,9 @@ public:
bool hasAnalyzedResult() const { return analyzed_result_ptr != nullptr; }
void setAnalyzedResult(MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); }
void resetParts(MergeTreeData::DataPartsVector parts)
{
prepared_parts = std::move(parts);
alter_conversions_for_parts = {};
}
const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; }
const std::vector<AlterConversionsPtr> & getAlterConvertionsForParts() const { return alter_conversions_for_parts; }
const MergeTreeData & getMergeTreeData() const { return data; }
size_t getMaxBlockSize() const { return block_size.max_block_size_rows; }
size_t getNumStreams() const { return requested_num_streams; }
@ -310,6 +306,7 @@ struct MergeTreeDataSelectAnalysisResult
bool error() const;
size_t marks() const;
UInt64 rows() const;
const RangesInDataParts & partsWithRanges() const;
};
}

View File

@ -6498,7 +6498,6 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
bool has_filter,
const SelectQueryInfo & query_info,
const DataPartsVector & parts,
DataPartsVector & normal_parts,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const
{
@ -6623,11 +6622,11 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
continue;
}
/// It's extremely rare that some parts have final marks while others don't. To make it
/// straightforward, disable minmax_count projection when `max(pk)' encounters any part with
/// no final mark.
if (need_primary_key_max_column && !part->index_granularity.hasFinalMark())
{
normal_parts.push_back(part);
continue;
}
return {};
real_parts.push_back(part);
filter_column_data.back() = 1;

View File

@ -401,17 +401,12 @@ public:
/// query_info - used to filter unneeded parts
///
/// parts - part set to filter
///
/// normal_parts - collects parts that don't have all the needed values to form the block.
/// Specifically, this is when a part doesn't contain a final mark and the related max value is
/// required.
Block getMinMaxCountProjectionBlock(
const StorageMetadataPtr & metadata_snapshot,
const Names & required_columns,
bool has_filter,
const SelectQueryInfo & query_info,
const DataPartsVector & parts,
DataPartsVector & normal_parts,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const;

View File

@ -828,8 +828,8 @@ std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPar
}
void MergeTreeDataSelectExecutor::filterPartsByPartition(
std::optional<PartitionPruner> & partition_pruner,
std::optional<KeyCondition> & minmax_idx_condition,
const std::optional<PartitionPruner> & partition_pruner,
const std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
@ -1288,6 +1288,8 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried);
std::optional<ReadFromMergeTree::Indexes> indexes;
/// NOTE: We don't need alter_conversions because the returned analysis_result is only used for:
/// 1. estimate the number of rows to read; 2. projection reading, which doesn't have alter_conversions.
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
/*alter_conversions=*/ {},
@ -1824,7 +1826,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
const std::optional<std::unordered_set<String>> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
PartFilterCounters & counters)
{
@ -1886,7 +1888,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context,
PartFilterCounters & counters,

View File

@ -126,7 +126,7 @@ private:
const std::optional<std::unordered_set<String>> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
PartFilterCounters & counters);
@ -138,7 +138,7 @@ private:
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context,
PartFilterCounters & counters,
@ -178,8 +178,8 @@ public:
/// Filter parts using minmax index and partition key.
static void filterPartsByPartition(
std::optional<PartitionPruner> & partition_pruner,
std::optional<KeyCondition> & minmax_idx_condition,
const std::optional<PartitionPruner> & partition_pruner,
const std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,

View File

@ -31,7 +31,7 @@ PartitionPruner::PartitionPruner(const StorageMetadataPtr & metadata, ActionsDAG
{
}
bool PartitionPruner::canBePruned(const IMergeTreeDataPart & part)
bool PartitionPruner::canBePruned(const IMergeTreeDataPart & part) const
{
if (part.isEmpty())
return true;

View File

@ -16,14 +16,15 @@ public:
PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict);
PartitionPruner(const StorageMetadataPtr & metadata, ActionsDAGPtr filter_actions_dag, ContextPtr context, bool strict);
bool canBePruned(const IMergeTreeDataPart & part);
bool canBePruned(const IMergeTreeDataPart & part) const;
bool isUseless() const { return useless; }
const KeyCondition & getKeyCondition() const { return partition_condition; }
private:
std::unordered_map<String, bool> partition_filter_map;
/// Cache already analyzed partitions.
mutable std::unordered_map<String, bool> partition_filter_map;
/// partition_key is adjusted here (with substitution from modulo to moduloLegacy).
KeyDescription partition_key;

View File

@ -341,6 +341,8 @@ void StorageMergeTree::alter(
prev_mutation = it->first;
}
/// Always wait previous mutations synchronously, because alters
/// should be executed in sequential order.
if (prev_mutation != 0)
{
LOG_DEBUG(log, "Cannot change metadata with barrier alter query, will wait for mutation {}", prev_mutation);
@ -368,9 +370,7 @@ void StorageMergeTree::alter(
resetObjectColumnsFromActiveParts(parts_lock);
}
/// Always execute required mutations synchronously, because alters
/// should be executed in sequential order.
if (!maybe_mutation_commands.empty())
if (!maybe_mutation_commands.empty() && local_context->getSettingsRef().alter_sync > 0)
waitForMutation(mutation_version, false);
}

View File

@ -0,0 +1 @@
Selected 2/2 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "drop table if exists t"
${CLICKHOUSE_CLIENT} -q "create table t(s LowCardinality(String), e DateTime64(3), projection p1 (select * order by s, e)) engine MergeTree partition by toYYYYMM(e) order by tuple()"
${CLICKHOUSE_CLIENT} -q "insert into t select 'AAP', toDateTime('2023-07-01') + 360 * number from numbers(50000)"
${CLICKHOUSE_CLIENT} -q "insert into t select 'AAPL', toDateTime('2023-07-01') + 360 * number from numbers(50000)"
CLICKHOUSE_CLIENT_DEBUG_LOG=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=debug/g')
${CLICKHOUSE_CLIENT_DEBUG_LOG} -q "select count() from t where e >= '2023-11-08 00:00:00.000' and e < '2023-11-09 00:00:00.000' and s in ('AAPL') format Null" 2>&1 | grep -oh "Selected .* parts by partition key, *. parts by primary key, .* marks by primary key, .* marks to read from .* ranges.*$"
${CLICKHOUSE_CLIENT} -q "drop table t"

View File

@ -0,0 +1,15 @@
drop table if exists t;
create table t (i int, j int, projection p (select i order by i)) engine MergeTree order by tuple();
insert into t values (1, 2);
system stop merges t;
set alter_sync = 0;
alter table t rename column j to k;
select * from t;
drop table t;