From d88299b94dc9563f494d6f1ad9374d1055ad13a8 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 14 Aug 2024 23:46:40 +0200 Subject: [PATCH 1/6] check that merge entries are valid --- src/Storages/MergeTree/MergeList.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 8fbb163384e..2d48ed4537a 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -10,6 +10,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMutatedPartPtr future_part, const ContextPtr & context) : table_id{table_id_} , partition_id{future_part->part_info.partition_id} @@ -21,8 +26,15 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta , merge_type{future_part->merge_type} , merge_algorithm{MergeAlgorithm::Undecided} { + auto format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING; + if (result_part_name != result_part_info.getPartNameV1()) + format_version = MERGE_TREE_DATA_OLD_FORMAT_VERSION; + for (const auto & source_part : future_part->parts) { + if (!result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version))) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Source part {} is not covered by result part {}", source_part->name, result_part_info.getPartNameV1()); + source_part_names.emplace_back(source_part->name); source_part_paths.emplace_back(source_part->getDataPartStorage().getFullPath()); @@ -42,6 +54,9 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta part->partition.serializeText(part->storage, out, {}); } + if (is_mutation && future_part->parts.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got {} source parts for mutation {}", future_part->parts.size(), result_part_info.getPartNameV1()); + thread_group = ThreadGroup::createForBackgroundProcess(context); } From 88fa8cb5bb52c7efd2b8e132236ee10ba0dd33d4 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 15 Aug 2024 00:06:41 +0200 Subject: [PATCH 2/6] Update MergeList.cpp --- src/Storages/MergeTree/MergeList.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 2d48ed4537a..1eb2e707194 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -32,7 +32,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta for (const auto & source_part : future_part->parts) { - if (!result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version))) + if (!source_part->getParentPart() && !result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version))) throw Exception(ErrorCodes::LOGICAL_ERROR, "Source part {} is not covered by result part {}", source_part->name, result_part_info.getPartNameV1()); source_part_names.emplace_back(source_part->name); From 3972991b1f364540927858e5f45bf519a1bba928 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 15 Aug 2024 23:49:49 +0200 Subject: [PATCH 3/6] Update MergeList.cpp --- src/Storages/MergeTree/MergeList.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 1eb2e707194..19c8b2f084e 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -47,7 +47,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta if (!future_part->parts.empty()) { source_data_version = future_part->parts[0]->info.getDataVersion(); - is_mutation = (result_part_info.getDataVersion() != source_data_version); + is_mutation = (result_part_info.level == future_part->parts[0]->info.level); WriteBufferFromString out(partition); const auto & part = future_part->parts[0]; From 1049e366534635a510e9f0c769b5635a073a0c1c Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Sun, 18 Aug 2024 23:35:20 +0200 Subject: [PATCH 4/6] Update MergeList.cpp --- src/Storages/MergeTree/MergeList.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 19c8b2f084e..2465222ae6c 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -30,8 +30,10 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta if (result_part_name != result_part_info.getPartNameV1()) format_version = MERGE_TREE_DATA_OLD_FORMAT_VERSION; + size_t normal_parts_count = 0; for (const auto & source_part : future_part->parts) { + normal_parts_count += !source_part->getParentPart(); if (!source_part->getParentPart() && !result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version))) throw Exception(ErrorCodes::LOGICAL_ERROR, "Source part {} is not covered by result part {}", source_part->name, result_part_info.getPartNameV1()); @@ -54,7 +56,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta part->partition.serializeText(part->storage, out, {}); } - if (is_mutation && future_part->parts.size() != 1) + if (is_mutation && normal_parts_count != 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "Got {} source parts for mutation {}", future_part->parts.size(), result_part_info.getPartNameV1()); thread_group = ThreadGroup::createForBackgroundProcess(context); From 0b68517279dec4cdd1468719c846d17ef85629d2 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 20 Aug 2024 20:01:35 +0200 Subject: [PATCH 5/6] skip projections --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 2 +- src/Storages/MergeTree/MergeList.cpp | 19 ++++++++++++++----- src/Storages/MergeTree/MergeList.h | 2 ++ src/Storages/MergeTree/MergeTask.cpp | 2 +- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 195aa4fdc10..a48a04b4476 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -807,7 +807,7 @@ MergeTreeDataPartBuilder IMergeTreeDataPart::getProjectionPartBuilder(const Stri const char * projection_extension = is_temp_projection ? ".tmp_proj" : ".proj"; auto projection_storage = getDataPartStorage().getProjection(projection_name + projection_extension, !is_temp_projection); MergeTreeDataPartBuilder builder(storage, projection_name, projection_storage); - return builder.withPartInfo({"all", 0, 0, 0}).withParentPart(this); + return builder.withPartInfo(MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION).withParentPart(this); } void IMergeTreeDataPart::addProjectionPart( diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 2465222ae6c..ed58b29d584 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -15,6 +15,8 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +const MergeTreePartInfo MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION = {"all", 0, 0, 0}; + MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMutatedPartPtr future_part, const ContextPtr & context) : table_id{table_id_} , partition_id{future_part->part_info.partition_id} @@ -30,12 +32,18 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta if (result_part_name != result_part_info.getPartNameV1()) format_version = MERGE_TREE_DATA_OLD_FORMAT_VERSION; + /// FIXME why do we need a merge list element for projection parts at all? + bool skip_sanity_checks = future_part->part_info == FAKE_RESULT_PART_FOR_PROJECTION; + size_t normal_parts_count = 0; for (const auto & source_part : future_part->parts) { - normal_parts_count += !source_part->getParentPart(); - if (!source_part->getParentPart() && !result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version))) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Source part {} is not covered by result part {}", source_part->name, result_part_info.getPartNameV1()); + if (!skip_sanity_checks && !source_part->getParentPart()) + { + ++normal_parts_count; + if (!result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version))) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Source part {} is not covered by result part {}", source_part->name, result_part_info.getPartNameV1()); + } source_part_names.emplace_back(source_part->name); source_part_paths.emplace_back(source_part->getDataPartStorage().getFullPath()); @@ -56,8 +64,9 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta part->partition.serializeText(part->storage, out, {}); } - if (is_mutation && normal_parts_count != 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Got {} source parts for mutation {}", future_part->parts.size(), result_part_info.getPartNameV1()); + if (!skip_sanity_checks && is_mutation && normal_parts_count != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got {} source parts for mutation {}: {}", future_part->parts.size(), + result_part_info.getPartNameV1(), fmt::join(source_part_names, ", ")); thread_group = ThreadGroup::createForBackgroundProcess(context); } diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index d40af6abf43..66190de0ef4 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -66,6 +66,8 @@ struct Settings; struct MergeListElement : boost::noncopyable { + static const MergeTreePartInfo FAKE_RESULT_PART_FOR_PROJECTION; + const StorageID table_id; std::string partition_id; std::string partition; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 26cb821f33b..f7cb01653c5 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -889,7 +889,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c // TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts. // Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection. // projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/"; - projection_future_part->part_info = {"all", 0, 0, 0}; + projection_future_part->part_info = MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION; MergeTreeData::MergingParams projection_merging_params; projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary; From 936bbe7d0d1da9c5ddecaf04eff45c3805d0429b Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 26 Aug 2024 20:02:03 +0200 Subject: [PATCH 6/6] handle trash parts correctly --- src/Storages/MergeTree/MergeList.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index ed58b29d584..b80d7fccc91 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -6,6 +6,7 @@ #include #include +#include namespace DB { @@ -33,12 +34,12 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta format_version = MERGE_TREE_DATA_OLD_FORMAT_VERSION; /// FIXME why do we need a merge list element for projection parts at all? - bool skip_sanity_checks = future_part->part_info == FAKE_RESULT_PART_FOR_PROJECTION; + bool is_fake_projection_part = future_part->part_info == FAKE_RESULT_PART_FOR_PROJECTION; size_t normal_parts_count = 0; for (const auto & source_part : future_part->parts) { - if (!skip_sanity_checks && !source_part->getParentPart()) + if (!is_fake_projection_part && !source_part->getParentPart()) { ++normal_parts_count; if (!result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version))) @@ -57,14 +58,14 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta if (!future_part->parts.empty()) { source_data_version = future_part->parts[0]->info.getDataVersion(); - is_mutation = (result_part_info.level == future_part->parts[0]->info.level); + is_mutation = (result_part_info.level == future_part->parts[0]->info.level) && !is_fake_projection_part; WriteBufferFromString out(partition); const auto & part = future_part->parts[0]; part->partition.serializeText(part->storage, out, {}); } - if (!skip_sanity_checks && is_mutation && normal_parts_count != 1) + if (!is_fake_projection_part && is_mutation && normal_parts_count != 1) throw Exception(ErrorCodes::LOGICAL_ERROR, "Got {} source parts for mutation {}: {}", future_part->parts.size(), result_part_info.getPartNameV1(), fmt::join(source_part_names, ", "));