make code less ugly

This commit is contained in:
Alexander Tokmakov 2021-06-09 15:36:47 +03:00
parent 3ade38df82
commit cef22688ff
3 changed files with 39 additions and 28 deletions

View File

@ -429,9 +429,14 @@ void IMergeTreeDataPart::removeIfNeeded()
} }
if (parent_part) if (parent_part)
projectionRemove(parent_part->getFullRelativePath(), keep_s3_on_delete); {
std::optional<bool> keep_shared_data = keepSharedDataInDecoupledStorage();
if (!keep_shared_data.has_value())
return;
projectionRemove(parent_part->getFullRelativePath(), *keep_shared_data);
}
else else
remove(keep_s3_on_delete); remove();
if (state == State::DeleteOnDestroy) if (state == State::DeleteOnDestroy)
{ {
@ -1096,24 +1101,30 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
storage.lockSharedData(*this); storage.lockSharedData(*this);
} }
std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
void IMergeTreeDataPart::removeIfNotLockedInS3() const
{ {
/// NOTE: It's needed for S3 zero-copy replication
if (force_keep_shared_data)
return true;
/// TODO Unlocking in try-catch and ignoring exception look ugly
try try
{ {
/// TODO Unlocking in try-catch looks ugly. Special "keep_s3" flag return !storage.unlockSharedData(*this);
/// which is a bit different from "keep_s3_on_delete" flag looks ugly too.
bool keep_s3 = !storage.unlockSharedData(*this);
remove(keep_s3);
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__, "There is a problem with deleting part " + name + " from filesystem"); tryLogCurrentException(__PRETTY_FUNCTION__, "There is a problem with deleting part " + name + " from filesystem");
} }
return {};
} }
void IMergeTreeDataPart::remove(bool keep_s3) const void IMergeTreeDataPart::remove() const
{ {
std::optional<bool> keep_shared_data = keepSharedDataInDecoupledStorage();
if (!keep_shared_data.has_value())
return;
if (!isStoredOnDisk()) if (!isStoredOnDisk())
return; return;
@ -1123,7 +1134,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
if (isProjectionPart()) if (isProjectionPart())
{ {
LOG_WARNING(storage.log, "Projection part {} should be removed by its parent {}.", name, parent_part->name); LOG_WARNING(storage.log, "Projection part {} should be removed by its parent {}.", name, parent_part->name);
projectionRemove(parent_part->getFullRelativePath(), keep_s3); projectionRemove(parent_part->getFullRelativePath(), *keep_shared_data);
return; return;
} }
@ -1149,7 +1160,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
LOG_WARNING(storage.log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart. Removing it.", fullPath(disk, to)); LOG_WARNING(storage.log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart. Removing it.", fullPath(disk, to));
try try
{ {
disk->removeSharedRecursive(fs::path(to) / "", keep_s3); disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
} }
catch (...) catch (...)
{ {
@ -1176,7 +1187,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
std::unordered_set<String> projection_directories; std::unordered_set<String> projection_directories;
for (const auto & [p_name, projection_part] : projection_parts) for (const auto & [p_name, projection_part] : projection_parts)
{ {
projection_part->projectionRemove(to, keep_s3); projection_part->projectionRemove(to, *keep_shared_data);
projection_directories.emplace(p_name + ".proj"); projection_directories.emplace(p_name + ".proj");
} }
@ -1184,7 +1195,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
if (checksums.empty()) if (checksums.empty())
{ {
/// If the part is not completely written, we cannot use fast path by listing files. /// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(to) / "", keep_s3); disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
} }
else else
{ {
@ -1199,17 +1210,17 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
for (const auto & [file, _] : checksums.files) for (const auto & [file, _] : checksums.files)
{ {
if (projection_directories.find(file) == projection_directories.end()) if (projection_directories.find(file) == projection_directories.end())
disk->removeSharedFile(fs::path(to) / file, keep_s3); disk->removeSharedFile(fs::path(to) / file, *keep_shared_data);
} }
#if !defined(__clang__) #if !defined(__clang__)
# pragma GCC diagnostic pop # pragma GCC diagnostic pop
#endif #endif
for (const auto & file : {"checksums.txt", "columns.txt"}) for (const auto & file : {"checksums.txt", "columns.txt"})
disk->removeSharedFile(fs::path(to) / file, keep_s3); disk->removeSharedFile(fs::path(to) / file, *keep_shared_data);
disk->removeSharedFileIfExists(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, keep_s3); disk->removeSharedFileIfExists(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, *keep_shared_data);
disk->removeSharedFileIfExists(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, keep_s3); disk->removeSharedFileIfExists(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, *keep_shared_data);
disk->removeDirectory(to); disk->removeDirectory(to);
} }
@ -1219,7 +1230,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false)); LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
disk->removeSharedRecursive(fs::path(to) / "", keep_s3); disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
} }
} }
} }

View File

@ -126,8 +126,7 @@ public:
/// Throws an exception if part is not stored in on-disk format. /// Throws an exception if part is not stored in on-disk format.
void assertOnDisk() const; void assertOnDisk() const;
void remove(bool keep_s3 = false) const; void remove() const;
void removeIfNotLockedInS3() const;
void projectionRemove(const String & parent_to, bool keep_s3 = false) const; void projectionRemove(const String & parent_to, bool keep_s3 = false) const;
@ -199,7 +198,7 @@ public:
mutable std::atomic<bool> is_frozen {false}; mutable std::atomic<bool> is_frozen {false};
/// Flag for keep S3 data when zero-copy replication over S3 turned on. /// Flag for keep S3 data when zero-copy replication over S3 turned on.
mutable bool keep_s3_on_delete = false; mutable bool force_keep_shared_data = false;
/** /**
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only. * Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
@ -426,6 +425,8 @@ protected:
String getRelativePathForDetachedPart(const String & prefix) const; String getRelativePathForDetachedPart(const String & prefix) const;
std::optional<bool> keepSharedDataInDecoupledStorage() const;
private: private:
/// In compact parts order of columns is necessary /// In compact parts order of columns is necessary
NameToNumber column_name_to_position; NameToNumber column_name_to_position;

View File

@ -1313,7 +1313,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
CurrentThread::attachTo(thread_group); CurrentThread::attachTo(thread_group);
LOG_DEBUG(log, "Removing part from filesystem {}", part->name); LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->removeIfNotLockedInS3(); part->remove();
}); });
} }
@ -1324,7 +1324,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
for (const DataPartPtr & part : parts_to_remove) for (const DataPartPtr & part : parts_to_remove)
{ {
LOG_DEBUG(log, "Removing part from filesystem {}", part->name); LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->removeIfNotLockedInS3(); part->remove();
} }
} }
} }
@ -2736,17 +2736,16 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
/// We do not check allow_s3_zero_copy_replication here because data may be shared /// We do not check allow_s3_zero_copy_replication here because data may be shared
/// when allow_s3_zero_copy_replication turned on and off again /// when allow_s3_zero_copy_replication turned on and off again
original_active_part->keep_s3_on_delete = false; original_active_part->force_keep_shared_data = false;
if (original_active_part->volume->getDisk()->getType() == DiskType::Type::S3) if (original_active_part->volume->getDisk()->getType() == DiskType::Type::S3)
{ {
if (part_copy->volume->getDisk()->getType() == DiskType::Type::S3 if (part_copy->volume->getDisk()->getType() == DiskType::Type::S3
&& original_active_part->getUniqueId() == part_copy->getUniqueId()) && original_active_part->getUniqueId() == part_copy->getUniqueId())
{ /// May be when several volumes use the same S3 storage {
original_active_part->keep_s3_on_delete = true; /// May be when several volumes use the same S3 storage
original_active_part->force_keep_shared_data = true;
} }
else
original_active_part->keep_s3_on_delete = !unlockSharedData(*original_active_part);
} }
modifyPartState(original_active_part, DataPartState::DeleteOnDestroy); modifyPartState(original_active_part, DataPartState::DeleteOnDestroy);