Merge pull request #68366 from ClickHouse/check_merge_entries

Check that merge entries are valid
This commit is contained in:
Alexander Tokmakov 2024-08-27 11:17:11 +00:00 committed by GitHub
commit 60a0ea5746
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 32 additions and 3 deletions

View File

@ -807,7 +807,7 @@ MergeTreeDataPartBuilder IMergeTreeDataPart::getProjectionPartBuilder(const Stri
const char * projection_extension = is_temp_projection ? ".tmp_proj" : ".proj"; const char * projection_extension = is_temp_projection ? ".tmp_proj" : ".proj";
auto projection_storage = getDataPartStorage().getProjection(projection_name + projection_extension, !is_temp_projection); auto projection_storage = getDataPartStorage().getProjection(projection_name + projection_extension, !is_temp_projection);
MergeTreeDataPartBuilder builder(storage, projection_name, projection_storage); MergeTreeDataPartBuilder builder(storage, projection_name, projection_storage);
return builder.withPartInfo({"all", 0, 0, 0}).withParentPart(this); return builder.withPartInfo(MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION).withParentPart(this);
} }
void IMergeTreeDataPart::addProjectionPart( void IMergeTreeDataPart::addProjectionPart(

View File

@ -6,10 +6,18 @@
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h> #include <Common/MemoryTracker.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
const MergeTreePartInfo MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION = {"all", 0, 0, 0};
MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMutatedPartPtr future_part, const ContextPtr & context) MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMutatedPartPtr future_part, const ContextPtr & context)
: table_id{table_id_} : table_id{table_id_}
, partition_id{future_part->part_info.partition_id} , partition_id{future_part->part_info.partition_id}
@ -21,8 +29,23 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta
, merge_type{future_part->merge_type} , merge_type{future_part->merge_type}
, merge_algorithm{MergeAlgorithm::Undecided} , merge_algorithm{MergeAlgorithm::Undecided}
{ {
auto format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
if (result_part_name != result_part_info.getPartNameV1())
format_version = MERGE_TREE_DATA_OLD_FORMAT_VERSION;
/// FIXME why do we need a merge list element for projection parts at all?
bool is_fake_projection_part = future_part->part_info == FAKE_RESULT_PART_FOR_PROJECTION;
size_t normal_parts_count = 0;
for (const auto & source_part : future_part->parts) for (const auto & source_part : future_part->parts)
{ {
if (!is_fake_projection_part && !source_part->getParentPart())
{
++normal_parts_count;
if (!result_part_info.contains(MergeTreePartInfo::fromPartName(source_part->name, format_version)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Source part {} is not covered by result part {}", source_part->name, result_part_info.getPartNameV1());
}
source_part_names.emplace_back(source_part->name); source_part_names.emplace_back(source_part->name);
source_part_paths.emplace_back(source_part->getDataPartStorage().getFullPath()); source_part_paths.emplace_back(source_part->getDataPartStorage().getFullPath());
@ -35,13 +58,17 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta
if (!future_part->parts.empty()) if (!future_part->parts.empty())
{ {
source_data_version = future_part->parts[0]->info.getDataVersion(); source_data_version = future_part->parts[0]->info.getDataVersion();
is_mutation = (result_part_info.getDataVersion() != source_data_version); is_mutation = (result_part_info.level == future_part->parts[0]->info.level) && !is_fake_projection_part;
WriteBufferFromString out(partition); WriteBufferFromString out(partition);
const auto & part = future_part->parts[0]; const auto & part = future_part->parts[0];
part->partition.serializeText(part->storage, out, {}); part->partition.serializeText(part->storage, out, {});
} }
if (!is_fake_projection_part && is_mutation && normal_parts_count != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got {} source parts for mutation {}: {}", future_part->parts.size(),
result_part_info.getPartNameV1(), fmt::join(source_part_names, ", "));
thread_group = ThreadGroup::createForBackgroundProcess(context); thread_group = ThreadGroup::createForBackgroundProcess(context);
} }

View File

@ -66,6 +66,8 @@ struct Settings;
struct MergeListElement : boost::noncopyable struct MergeListElement : boost::noncopyable
{ {
static const MergeTreePartInfo FAKE_RESULT_PART_FOR_PROJECTION;
const StorageID table_id; const StorageID table_id;
std::string partition_id; std::string partition_id;
std::string partition; std::string partition;

View File

@ -1021,7 +1021,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
// TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts. // TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts.
// Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection. // Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection.
// projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/"; // projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/";
projection_future_part->part_info = {"all", 0, 0, 0}; projection_future_part->part_info = MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION;
MergeTreeData::MergingParams projection_merging_params; MergeTreeData::MergingParams projection_merging_params;
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary; projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;