Merge pull request #44468 from ClickHouse/fix_get_part_name

Fix incorrect usages of `getPartName()`
This commit is contained in:
Alexander Tokmakov 2023-01-11 16:24:02 +03:00 committed by GitHub
commit 0fba3d6d81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 58 additions and 35 deletions

View File

@ -78,9 +78,9 @@ public:
throw Exception(
ErrorCodes::CANNOT_BACKUP_TABLE,
"Intersected parts detected: {} on replica {} and {} on replica {}",
part.info.getPartName(),
part.info.getPartNameForLogs(),
*part.replica_name,
new_part_info.getPartName(),
new_part_info.getPartNameForLogs(),
*replica_name);
}
++last_it;

View File

@ -47,7 +47,7 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String &
if (!part_info.contains(it->first))
{
if (!part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", part_info.getPartName(), it->first.getPartName());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", part_info.getPartNameForLogs(), it->first.getPartNameForLogs());
++it;
break;
}
@ -70,7 +70,7 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String &
}
if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", name, it->first.getPartName());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects part {}. It is a bug or a result of manual intervention in the ZooKeeper data.", name, it->first.getPartNameForLogs());
part_info_to_name.emplace(part_info, name);
return true;
@ -79,7 +79,7 @@ bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, const String &
bool ActiveDataPartSet::add(const MergeTreePartInfo & part_info, Strings * out_replaced_parts)
{
return add(part_info, part_info.getPartName(), out_replaced_parts);
return add(part_info, part_info.getPartNameAndCheckFormat(format_version), out_replaced_parts);
}

View File

@ -19,7 +19,7 @@ bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, s
{
if (!drop_range.isDisjoint(entry_info))
{
postpone_reason = fmt::format("Has DROP RANGE affecting entry {} producing part {}. Will postpone it's execution.", drop_range.getPartName(), new_part_name);
postpone_reason = fmt::format("Has DROP RANGE affecting entry {} producing part {}. Will postpone it's execution.", drop_range.getPartNameForLogs(), new_part_name);
return true;
}
}

View File

@ -81,7 +81,7 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_, Merg
name = part_info.getPartNameV0(min_date, max_date);
}
else
name = part_info.getPartName();
name = part_info.getPartNameV1();
}
void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const IReservation * reservation)

View File

@ -375,7 +375,7 @@ String IMergeTreeDataPart::getNewName(const MergeTreePartInfo & new_part_info) c
return new_part_info.getPartNameV0(min_date, max_date);
}
else
return new_part_info.getPartName();
return new_part_info.getPartNameV1();
}
std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const String & column_name) const

View File

@ -2054,7 +2054,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
MergeTreePartInfo range_info = part->info;
range_info.level = static_cast<UInt32>(range_info.max_block - range_info.min_block);
range_info.mutation = 0;
independent_ranges_set.add(range_info, range_info.getPartName());
independent_ranges_set.add(range_info, range_info.getPartNameV1());
}
auto independent_ranges_infos = independent_ranges_set.getPartInfos();
@ -2080,7 +2080,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
LOG_TRACE(log, "Removing {} parts in blocks range {}", batch.size(), range.getPartName());
LOG_TRACE(log, "Removing {} parts in blocks range {}", batch.size(), range.getPartNameForLogs());
for (const auto & part : batch)
{
@ -3405,7 +3405,7 @@ DataPartsVector MergeTreeData::grabActivePartsToRemoveForDropRange(
DataPartsVector parts_to_remove;
if (drop_range.min_block > drop_range.max_block)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid drop range: {}", drop_range.getPartName());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid drop range: {}", drop_range.getPartNameForLogs());
auto partition_range = getVisibleDataPartsVectorInPartition(txn, drop_range.partition_id, &lock);
@ -3437,7 +3437,7 @@ DataPartsVector MergeTreeData::grabActivePartsToRemoveForDropRange(
bool is_covered_by_min_max_block = part->info.min_block <= drop_range.min_block && part->info.max_block >= drop_range.max_block && part->info.getMutationVersion() >= drop_range.getMutationVersion();
if (is_covered_by_min_max_block)
{
LOG_INFO(log, "Skipping drop range for part {} because covering part {} already exists", drop_range.getPartName(), part->name);
LOG_INFO(log, "Skipping drop range for part {} because covering part {} already exists", drop_range.getPartNameForLogs(), part->name);
return {};
}
}
@ -3448,7 +3448,7 @@ DataPartsVector MergeTreeData::grabActivePartsToRemoveForDropRange(
{
/// Intersect left border
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected merged part {} intersecting drop range {}",
part->name, drop_range.getPartName());
part->name, drop_range.getPartNameForLogs());
}
continue;
@ -3462,7 +3462,7 @@ DataPartsVector MergeTreeData::grabActivePartsToRemoveForDropRange(
{
/// Intersect right border
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected merged part {} intersecting drop range {}",
part->name, drop_range.getPartName());
part->name, drop_range.getPartNameForLogs());
}
parts_to_remove.emplace_back(part);
@ -4241,8 +4241,8 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
{
auto part_info = MergeTreePartInfo::fromPartName(partition_id, format_version);
parts.push_back(getActiveContainingPart(part_info));
if (!parts.back() || parts.back()->name != part_info.getPartName())
throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART);
if (!parts.back() || parts.back()->name != part_info.getPartNameAndCheckFormat(format_version))
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} is not exists or not active", partition_id);
}
else
parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
@ -4283,18 +4283,18 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
{
auto part_info = MergeTreePartInfo::fromPartName(partition_id, format_version);
parts.emplace_back(getActiveContainingPart(part_info));
if (!parts.back() || parts.back()->name != part_info.getPartName())
throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART);
if (!parts.back() || parts.back()->name != part_info.getPartNameAndCheckFormat(format_version))
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} is not exists or not active", partition_id);
}
else
parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
auto volume = getStoragePolicy()->getVolumeByName(name);
if (!volume)
throw Exception("Volume " + name + " does not exists on policy " + getStoragePolicy()->getName(), ErrorCodes::UNKNOWN_DISK);
throw Exception(ErrorCodes::UNKNOWN_DISK, "Volume {} does not exists on policy {}", name, getStoragePolicy()->getName());
if (parts.empty())
throw Exception("Nothing to move (check that the partition exists).", ErrorCodes::NO_SUCH_DATA_PART);
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Nothing to move (check that the partition exists).");
std::erase_if(parts, [&](auto part_ptr)
{
@ -4661,7 +4661,7 @@ void MergeTreeData::restorePartsFromBackup(RestorerFromBackup & restorer, const
void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup) const
{
String part_name = part_info.getPartName();
String part_name = part_info.getPartNameAndCheckFormat(format_version);
auto backup = restored_parts_holder->getBackup();
UInt64 total_size_of_part = 0;

View File

@ -368,7 +368,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
part_name = new_part_info.getPartNameV0(min_date, max_date);
}
else
part_name = new_part_info.getPartName();
part_name = new_part_info.getPartNameV1();
std::string part_dir;
if (need_tmp_prefix)

View File

@ -232,7 +232,7 @@ std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std:
/// Create new record
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::ADD;
record.part_name = part_info.getPartName();
record.part_name = part_info.getPartNameAndCheckFormat(format_version);
record.block_id = block_id;
/// Write it to disk
writeRecord(record, *current_writer);
@ -269,7 +269,7 @@ void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_inf
/// Create drop record
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::DROP;
record.part_name = part_info.getPartName();
record.part_name = part_info.getPartNameAndCheckFormat(format_version);
record.block_id = itr->key;
/// Write it to disk
writeRecord(record, *current_writer);

View File

@ -167,7 +167,25 @@ bool MergeTreePartInfo::contains(const String & outer_part_name, const String &
}
String MergeTreePartInfo::getPartName() const
String MergeTreePartInfo::getPartNameAndCheckFormat(MergeTreeDataFormatVersion format_version) const
{
if (format_version == MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
return getPartNameV1();
/// We cannot just call getPartNameV0 because it requires extra arguments, but at least we can warn about it.
chassert(false); /// Catch it in CI. Feel free to remove this line.
throw Exception(ErrorCodes::BAD_DATA_PART_NAME, "Trying to get part name in new format for old format version. "
"Either some new feature is incompatible with deprecated *MergeTree definition syntax or it's a bug.");
}
String MergeTreePartInfo::getPartNameForLogs() const
{
/// We don't care about format version here
return getPartNameV1();
}
String MergeTreePartInfo::getPartNameV1() const
{
WriteBufferFromOwnString wb;

View File

@ -103,7 +103,9 @@ struct MergeTreePartInfo
return level == MergeTreePartInfo::MAX_LEVEL || level == another_max_level;
}
String getPartName() const;
String getPartNameAndCheckFormat(MergeTreeDataFormatVersion format_version) const;
String getPartNameForLogs() const;
String getPartNameV1() const;
String getPartNameV0(DayNum left_date, DayNum right_date) const;
UInt64 getBlocksCount() const
{

View File

@ -156,7 +156,7 @@ void MergeTreeSink::finishDelayedChunk()
if (!res.second)
{
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
LOG_INFO(storage.log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName());
LOG_INFO(storage.log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartNameForLogs());
continue;
}
}

View File

@ -473,7 +473,7 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
log_entry.log_entry_id = attach_log_entry_barrier_path;
log_entry.part_checksum = part->checksums.getTotalChecksumHex();
log_entry.create_time = std::time(nullptr);
log_entry.new_part_name = part_info.getPartName();
log_entry.new_part_name = part_info.getPartNameAndCheckFormat(storage.format_version);
ops.emplace_back(zkutil::makeCreateRequest(attach_log_entry_barrier_path, log_entry.toString(), -1));
ops.emplace_back(zkutil::makeSetRequest(entry.to_shard + "/log", "", -1));

View File

@ -1504,7 +1504,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartName());
info.getPartNameForLogs());
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
return false;
}

View File

@ -1490,8 +1490,11 @@ String StorageReplicatedMergeTree::getChecksumsForZooKeeper(const MergeTreeDataP
MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFoundValidPart(const LogEntry& entry) const
{
if (format_version != MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
return {};
const MergeTreePartInfo actual_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
const String part_new_name = actual_part_info.getPartName();
const String part_new_name = actual_part_info.getPartNameV1();
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
{
@ -1502,7 +1505,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
if (!part_info || part_info->partition_id != actual_part_info.partition_id)
continue;
const String part_old_name = part_info->getPartName();
const String part_old_name = part_info->getPartNameV1();
const VolumePtr volume = std::make_shared<SingleDiskVolume>("volume_" + part_old_name, disk);
@ -3892,7 +3895,7 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
bool StorageReplicatedMergeTree::partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const
{
auto zookeeper = getZooKeeper();
return zookeeper->exists(fs::path(zookeeper_path) / "quorum" / "parallel" / part_info.getPartName());
return zookeeper->exists(fs::path(zookeeper_path) / "quorum" / "parallel" / part_info.getPartNameAndCheckFormat(format_version));
}
@ -3914,7 +3917,7 @@ bool StorageReplicatedMergeTree::partIsLastQuorumPart(const MergeTreePartInfo &
if (partition_it == parts_with_quorum.added_parts.end())
return false;
return partition_it->second == part_info.getPartName();
return partition_it->second == part_info.getPartNameAndCheckFormat(format_version);
}
@ -5230,7 +5233,7 @@ String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const
return part_info.getPartNameV0(left_date, right_date);
}
return part_info.getPartName();
return part_info.getPartNameV1();
}
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(
@ -7725,7 +7728,7 @@ void StorageReplicatedMergeTree::enqueuePartForCheck(const String & part_name, t
if (queue.hasDropRange(MergeTreePartInfo::fromPartName(part_name, format_version), &covering_drop_range))
{
LOG_WARNING(log, "Do not enqueue part {} for check because it's covered by DROP_RANGE {} and going to be removed",
part_name, covering_drop_range.getPartName());
part_name, covering_drop_range.getPartNameForLogs());
return;
}
part_check_thread.enqueuePart(part_name, delay_to_check_seconds);