remove copypaste

This commit is contained in:
Alexander Tokmakov 2021-06-08 22:11:22 +03:00
parent 9291834e36
commit 3ade38df82
5 changed files with 20 additions and 57 deletions

View File

@ -1097,6 +1097,21 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
}
void IMergeTreeDataPart::removeIfNotLockedInS3() const
{
try
{
/// TODO Unlocking in try-catch looks ugly. Special "keep_s3" flag
/// which is a bit different from "keep_s3_on_delete" flag looks ugly too.
bool keep_s3 = !storage.unlockSharedData(*this);
remove(keep_s3);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, "There is a problem with deleting part " + name + " from filesystem");
}
}
void IMergeTreeDataPart::remove(bool keep_s3) const
{
if (!isStoredOnDisk())

View File

@ -127,6 +127,7 @@ public:
void assertOnDisk() const;
void remove(bool keep_s3 = false) const;
void removeIfNotLockedInS3() const;
void projectionRemove(const String & parent_to, bool keep_s3 = false) const;

View File

@ -1313,7 +1313,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
CurrentThread::attachTo(thread_group);
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();
part->removeIfNotLockedInS3();
});
}
@ -1324,7 +1324,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
for (const DataPartPtr & part : parts_to_remove)
{
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();
part->removeIfNotLockedInS3();
}
}
}

View File

@ -5939,57 +5939,6 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
return CancellationCode::CancelSent;
}
void StorageReplicatedMergeTree::removePartsFromFilesystem(const DataPartsVector & parts)
{
auto remove_part = [&](const auto & part)
{
LOG_DEBUG(log, "Removing part from filesystem {}", part.name);
try
{
bool keep_s3 = !this->unlockSharedData(part);
part.remove(keep_s3);
}
catch (...)
{
tryLogCurrentException(log, "There is a problem with deleting part " + part.name + " from filesystem");
}
};
const auto settings = getSettings();
if (settings->max_part_removal_threads > 1 && parts.size() > settings->concurrent_part_removal_threshold)
{
/// Parallel parts removal.
size_t num_threads = std::min<size_t>(settings->max_part_removal_threads, parts.size());
ThreadPool pool(num_threads);
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
for (const DataPartPtr & part : parts)
{
pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
remove_part(*part);
});
}
pool.wait();
}
else
{
for (const DataPartPtr & part : parts)
{
remove_part(*part);
}
}
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
auto table_lock = lockForShare(
@ -6017,7 +5966,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
/// Delete duplicate parts from filesystem
if (!parts_to_delete_only_from_filesystem.empty())
{
removePartsFromFilesystem(parts_to_delete_only_from_filesystem);
clearPartsFromFilesystem(parts_to_delete_only_from_filesystem);
removePartsFinally(parts_to_delete_only_from_filesystem);
LOG_DEBUG(log, "Removed {} old duplicate parts", parts_to_delete_only_from_filesystem.size());
@ -6062,7 +6011,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
/// Remove parts from filesystem and finally from data_parts
if (!parts_to_remove_from_filesystem.empty())
{
removePartsFromFilesystem(parts_to_remove_from_filesystem);
clearPartsFromFilesystem(parts_to_remove_from_filesystem);
removePartsFinally(parts_to_remove_from_filesystem);
LOG_DEBUG(log, "Removed {} old parts", parts_to_remove_from_filesystem.size());

View File

@ -439,8 +439,6 @@ private:
/// Just removes part from ZooKeeper using previous method
void removePartFromZooKeeper(const String & part_name);
void removePartsFromFilesystem(const DataPartsVector & parts);
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
NameSet * parts_should_be_retried = nullptr);