diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index a1565173b47..a20e5fc7264 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -807,7 +807,7 @@ MergeTreeDataPartBuilder IMergeTreeDataPart::getProjectionPartBuilder(const Stri const char * projection_extension = is_temp_projection ? ".tmp_proj" : ".proj"; auto projection_storage = getDataPartStorage().getProjection(projection_name + projection_extension, !is_temp_projection); MergeTreeDataPartBuilder builder(storage, projection_name, projection_storage); - return builder.withPartInfo({"all", 0, 0, 0}).withParentPart(this); + return builder.withPartInfo(MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION).withParentPart(this); } void IMergeTreeDataPart::addProjectionPart( diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 8fbb163384e..b80d7fccc91 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -6,10 +6,18 @@ #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +const MergeTreePartInfo MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION = {"all", 0, 0, 0}; + MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMutatedPartPtr future_part, const ContextPtr & context) : table_id{table_id_} , partition_id{future_part->part_info.partition_id} @@ -21,8 +29,23 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta , merge_type{future_part->merge_type} , merge_algorithm{MergeAlgorithm::Undecided} { + auto format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING; + if (result_part_name != result_part_info.getPartNameV1()) + format_version = MERGE_TREE_DATA_OLD_FORMAT_VERSION; + + /// FIXME why do we need a merge list element for projection parts at all? + bool is_fake_projection_part = future_part->part_info == FAKE_RESULT_PART_FOR_PROJECTION; + + size_t normal_parts_count = 0; for (const auto & source_part : future_part->parts) { + if (!is_fake_projection_part && !source_part->getParentPart()) + { + ++normal_parts_count; + if (!result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version))) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Source part {} is not covered by result part {}", source_part->name, result_part_info.getPartNameV1()); + } + source_part_names.emplace_back(source_part->name); source_part_paths.emplace_back(source_part->getDataPartStorage().getFullPath()); @@ -35,13 +58,17 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta if (!future_part->parts.empty()) { source_data_version = future_part->parts[0]->info.getDataVersion(); - is_mutation = (result_part_info.getDataVersion() != source_data_version); + is_mutation = (result_part_info.level == future_part->parts[0]->info.level) && !is_fake_projection_part; WriteBufferFromString out(partition); const auto & part = future_part->parts[0]; part->partition.serializeText(part->storage, out, {}); } + if (!is_fake_projection_part && is_mutation && normal_parts_count != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got {} source parts for mutation {}: {}", future_part->parts.size(), + result_part_info.getPartNameV1(), fmt::join(source_part_names, ", ")); + thread_group = ThreadGroup::createForBackgroundProcess(context); } diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index d40af6abf43..66190de0ef4 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -66,6 +66,8 @@ struct Settings; struct MergeListElement : boost::noncopyable { + static const MergeTreePartInfo FAKE_RESULT_PART_FOR_PROJECTION; + const StorageID table_id; std::string partition_id; std::string partition; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index aa178be270f..fa86bb31629 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1021,7 +1021,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c // TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts. // Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection. // projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/"; - projection_future_part->part_info = {"all", 0, 0, 0}; + projection_future_part->part_info = MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION; MergeTreeData::MergingParams projection_merging_params; projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;