Primary key analysis for _part_offset

This commit is contained in:
Amos Bird 2023-12-26 16:56:52 +08:00
parent 85b149395a
commit bfcccf9fa3
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
7 changed files with 155 additions and 21 deletions

View File

@ -1338,7 +1338,7 @@ static void buildIndexes(
context,
primary_key_column_names,
primary_key.expression,
array_join_name_set}, {}, {}, {}, false, {}});
array_join_name_set}, {}, {}, {}, {}, false, {}});
}
else
{
@ -1346,7 +1346,7 @@ static void buildIndexes(
query_info,
context,
primary_key_column_names,
primary_key.expression}, {}, {}, {}, false, {}});
primary_key.expression}, {}, {}, {}, {}, false, {}});
}
if (metadata_snapshot->hasPartitionKey())
@ -1365,6 +1365,8 @@ static void buildIndexes(
else
indexes->part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, query_info.query, context);
MergeTreeDataSelectExecutor::buildKeyConditionFromPartOffset(indexes->part_offset_condition, filter_actions_dag, context);
indexes->use_skip_indexes = settings.use_skip_indexes;
bool final = query_info.isFinal();
@ -1549,6 +1551,9 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
}
LOG_DEBUG(log, "Key condition: {}", indexes->key_condition.toString());
if (indexes->part_offset_condition)
LOG_DEBUG(log, "Part offset condition: {}", indexes->part_offset_condition->toString());
if (indexes->key_condition.alwaysFalse())
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
@ -1595,6 +1600,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
metadata_snapshot,
context,
indexes->key_condition,
indexes->part_offset_condition,
indexes->skip_indexes,
reader_settings,
log,

View File

@ -151,6 +151,7 @@ public:
KeyCondition key_condition;
std::optional<PartitionPruner> partition_pruner;
std::optional<KeyCondition> minmax_idx_condition;
std::optional<KeyCondition> part_offset_condition;
UsefulSkipIndexes skip_indexes;
bool use_skip_indexes;
std::optional<std::unordered_set<String>> part_values;

View File

@ -92,7 +92,7 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
for (const auto & part : parts)
{
MarkRanges ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
MarkRanges ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, {}, settings, log);
/** In order to get a lower bound on the number of rows that match the condition on PK,
* consider only guaranteed full marks.
@ -770,6 +770,35 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling(
return sampling;
}
void MergeTreeDataSelectExecutor::buildKeyConditionFromPartOffset(
std::optional<KeyCondition> & part_offset_condition, const ActionsDAGPtr & filter_dag, ContextPtr context)
{
if (!filter_dag)
return;
auto part_offset_type = std::make_shared<DataTypeUInt64>();
auto part_type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
Block sample
= {ColumnWithTypeAndName(part_offset_type->createColumn(), part_offset_type, "_part_offset"),
ColumnWithTypeAndName(part_type->createColumn(), part_type, "_part")};
auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_dag->getOutputs().at(0), sample);
if (!dag)
return;
/// The _part filter should only be effective in conjunction with the _part_offset filter.
auto required_columns = dag->getRequiredColumnsNames();
if (std::find(required_columns.begin(), required_columns.end(), "_part_offset") == required_columns.end())
return;
part_offset_condition.emplace(KeyCondition{
dag,
context,
sample.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(sample.getColumnsWithTypeAndName()), ExpressionActionsSettings{}),
{}});
}
std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(
const MergeTreeData & data,
const MergeTreeData::DataPartsVector & parts,
@ -909,6 +938,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
StorageMetadataPtr metadata_snapshot,
const ContextPtr & context,
const KeyCondition & key_condition,
const std::optional<KeyCondition> & part_offset_condition,
const UsefulSkipIndexes & skip_indexes,
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log,
@ -983,8 +1013,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
RangesInDataPart ranges(part, alter_conversions_for_part, part_index);
size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal();
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
if (metadata_snapshot->hasPrimaryKey() || part_offset_condition)
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, part_offset_condition, settings, log);
else if (total_marks_count)
ranges.ranges = MarkRanges{{MarkRange{0, total_marks_count}}};
@ -1404,6 +1434,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const std::optional<KeyCondition> & part_offset_condition,
const Settings & settings,
Poco::Logger * log)
{
@ -1417,7 +1448,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
bool has_final_mark = part->index_granularity.hasFinalMark();
/// If index is not used.
if (key_condition.alwaysUnknownOrTrue())
if (key_condition.alwaysUnknownOrTrue() && (!part_offset_condition || part_offset_condition->alwaysUnknownOrTrue()))
{
if (has_final_mark)
res.push_back(MarkRange(0, marks_count - 1));
@ -1467,32 +1498,69 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
std::vector<FieldRef> index_left(used_key_size);
std::vector<FieldRef> index_right(used_key_size);
/// For _part_offset and _part virtual columns
DataTypes part_offset_types
= {std::make_shared<DataTypeUInt64>(), std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())};
std::vector<FieldRef> part_offset_left(2);
std::vector<FieldRef> part_offset_right(2);
auto may_be_true_in_range = [&](MarkRange & range)
{
if (range.end == marks_count && !has_final_mark)
bool key_condition_maybe_true = true;
if (!key_condition.alwaysUnknownOrTrue())
{
for (size_t i = 0; i < used_key_size; ++i)
if (range.end == marks_count && !has_final_mark)
{
create_field_ref(range.begin, i, index_left[i]);
index_right[i] = POSITIVE_INFINITY;
for (size_t i = 0; i < used_key_size; ++i)
{
create_field_ref(range.begin, i, index_left[i]);
index_right[i] = POSITIVE_INFINITY;
}
}
}
else
{
if (has_final_mark && range.end == marks_count)
range.end -= 1; /// Remove final empty mark. It's useful only for primary key condition.
else
{
if (has_final_mark && range.end == marks_count)
range.end -= 1; /// Remove final empty mark. It's useful only for primary key condition.
for (size_t i = 0; i < used_key_size; ++i)
for (size_t i = 0; i < used_key_size; ++i)
{
create_field_ref(range.begin, i, index_left[i]);
create_field_ref(range.end, i, index_right[i]);
}
}
key_condition_maybe_true = key_condition.mayBeTrueInRange(used_key_size, index_left.data(), index_right.data(), key_types);
}
bool part_offset_condition_maybe_true = true;
if (part_offset_condition && !part_offset_condition->alwaysUnknownOrTrue())
{
auto begin = part->index_granularity.getMarkStartingRow(range.begin);
auto end = part->index_granularity.getMarkStartingRow(range.end) - 1;
if (begin > end)
{
create_field_ref(range.begin, i, index_left[i]);
create_field_ref(range.end, i, index_right[i]);
/// Empty mark (final mark)
part_offset_condition_maybe_true = false;
}
else
{
part_offset_left[0] = part->index_granularity.getMarkStartingRow(range.begin);
part_offset_right[0] = part->index_granularity.getMarkStartingRow(range.end) - 1;
part_offset_left[1] = part->name;
part_offset_right[1] = part->name;
part_offset_condition_maybe_true
= part_offset_condition->mayBeTrueInRange(2, part_offset_left.data(), part_offset_right.data(), part_offset_types);
}
}
return key_condition.mayBeTrueInRange(used_key_size, index_left.data(), index_right.data(), key_types);
return key_condition_maybe_true && part_offset_condition_maybe_true;
};
bool key_condition_exact_range = key_condition.alwaysUnknownOrTrue() || key_condition.matchesExactContinuousRange();
bool part_offset_condition_exact_range
= !part_offset_condition || part_offset_condition->alwaysUnknownOrTrue() || part_offset_condition->matchesExactContinuousRange();
const String & part_name = part->isProjectionPart() ? fmt::format("{}.{}", part->name, part->getParentPart()->name) : part->name;
if (!key_condition.matchesExactContinuousRange())
if (!key_condition_exact_range || !part_offset_condition_exact_range)
{
// Do exclusion search, where we drop ranges that do not match

View File

@ -71,6 +71,7 @@ public:
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const std::optional<KeyCondition> & part_offset_condition,
const Settings & settings,
Poco::Logger * log);
@ -161,6 +162,9 @@ public:
size_t bytes_granularity,
size_t max_marks);
static void buildKeyConditionFromPartOffset(
std::optional<KeyCondition> & part_offset_condition, const ActionsDAGPtr & filter_dag, ContextPtr context);
/// If possible, filter using expression on virtual columns.
/// Example: SELECT count() FROM table WHERE _part = 'part_name'
/// If expression found, return a set with allowed part names (std::nullopt otherwise).
@ -199,6 +203,7 @@ public:
StorageMetadataPtr metadata_snapshot,
const ContextPtr & context,
const KeyCondition & key_condition,
const std::optional<KeyCondition> & part_offset_condition,
const UsefulSkipIndexes & skip_indexes,
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log,

View File

@ -322,7 +322,7 @@ public:
if (!key_condition.alwaysFalse())
mark_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange(
data_part, metadata_snapshot, key_condition, context->getSettingsRef(), log);
data_part, metadata_snapshot, key_condition, {}, context->getSettingsRef(), log);
if (mark_ranges && mark_ranges->empty())
{

View File

@ -0,0 +1,14 @@
-4
-3
-2
-1
0
-3
0
-4
-2
-1
0
10
40
400

View File

@ -0,0 +1,40 @@
drop table if exists a;
create table a (i int) engine MergeTree order by i settings index_granularity = 2;
insert into a select -number from numbers(5);
-- nothing to read
select i from a where _part_offset >= 5 order by i settings max_bytes_to_read = 1;
-- one granules
select i from a where _part_offset = 0 order by i settings max_rows_to_read = 2;
select i from a where _part_offset = 1 order by i settings max_rows_to_read = 2;
select i from a where _part_offset = 2 order by i settings max_rows_to_read = 2;
select i from a where _part_offset = 3 order by i settings max_rows_to_read = 2;
select i from a where _part_offset = 4 order by i settings max_rows_to_read = 1;
-- other predicates
select i from a where _part_offset in (1, 4) order by i settings max_rows_to_read = 3;
select i from a where _part_offset not in (1, 4) order by i settings max_rows_to_read = 4;
-- force primary key check still works
select i from a where _part_offset = 4 order by i settings force_primary_key = 1; -- { serverError INDEX_NOT_USED }
-- combining with other primary keys doesn't work (makes no sense)
select i from a where i = -3 or _part_offset = 4 order by i settings force_primary_key = 1; -- { serverError INDEX_NOT_USED }
drop table a;
drop table if exists b;
create table b (i int) engine MergeTree order by tuple() settings index_granularity = 2;
-- all_1_1_0
insert into b select number * 10 from numbers(5);
-- all_2_2_0
insert into b select number * 100 from numbers(5);
-- multiple parts with _part predicate
select i from b where (_part = 'all_1_1_0' and _part_offset in (1, 4)) or (_part = 'all_2_2_0' and _part_offset in (0, 4)) order by i settings max_rows_to_read = 6;
drop table b;