Fix forgotten parts in cleanup thread

This commit is contained in:
alesapin 2022-05-08 00:53:55 +02:00
parent 47edcdcb1b
commit 46712f1d98
3 changed files with 126 additions and 9 deletions

View File

@ -1754,7 +1754,28 @@ size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
return parts_to_remove.size();
}
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_remove)
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts)
{
clearPartsFromFilesystemImpl(parts, true, nullptr);
}
void MergeTreeData::tryClearPartsFromFilesystem(const DataPartsVector & parts, NameSet & parts_failed_to_delete)
{
NameSet part_names_successeded;
clearPartsFromFilesystemImpl(parts, false, &part_names_successeded);
if (part_names_successeded.size() == parts.size())
return;
for (const auto & part : parts)
{
if (!part_names_successeded.contains(part->name))
parts_failed_to_delete.insert(part->name);
}
}
void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_to_remove, bool throw_on_error, NameSet * part_names_successed)
{
const auto settings = getSettings();
if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold)
@ -1763,6 +1784,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
size_t num_threads = std::min<size_t>(settings->max_part_removal_threads, parts_to_remove.size());
ThreadPool pool(num_threads);
std::mutex part_names_mutex;
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
for (const DataPartPtr & part : parts_to_remove)
@ -1774,17 +1796,50 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();
if (part_names_successed)
{
std::lock_guard lock(part_names_mutex);
part_names_successed->insert(part->name);
}
});
}
if (throw_on_error)
{
pool.wait();
}
else
{
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(log);
}
}
}
else
{
for (const DataPartPtr & part : parts_to_remove)
{
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
try
{
part->remove();
if (part_names_successed)
part_names_successed->insert(part->name);
}
catch (...)
{
if (throw_on_error)
throw;
tryLogCurrentException(log);
break;
}
}
}
}

View File

@ -615,7 +615,10 @@ public:
/// Delete irrelevant parts from memory and disk.
/// If 'force' - don't wait for old_parts_lifetime.
size_t clearOldPartsFromFilesystem(bool force = false);
/// Try to clear parts from filesystem. Throw exception in case of errors.
void clearPartsFromFilesystem(const DataPartsVector & parts);
/// Try to clear parts from filesystem. Fill failed parts into parts_failed_to_delete.
void tryClearPartsFromFilesystem(const DataPartsVector & parts, NameSet & parts_failed_to_delete);
/// Delete WAL files containing parts, that all already stored on disk.
size_t clearOldWriteAheadLogs();
@ -1298,6 +1301,11 @@ private:
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.
/// Otherwise, in non-parallel case will break and return.
void clearPartsFromFilesystemImpl(const DataPartsVector & parts, bool throw_on_error, NameSet * part_names_successed);
TemporaryParts temporary_parts;
};

View File

@ -6112,6 +6112,8 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto zookeeper = getZooKeeper();
/// Now these parts are in Deleting state. If we fail to remove some of them we must roll them back to Outdated state.
/// Otherwise they will not be deleted.
DataPartsVector parts = grabOldParts();
if (parts.empty())
return;
@ -6133,10 +6135,35 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
/// Delete duplicate parts from filesystem
if (!parts_to_delete_only_from_filesystem.empty())
{
clearPartsFromFilesystem(parts_to_delete_only_from_filesystem);
removePartsFinally(parts_to_delete_only_from_filesystem);
/// It can happen that some error appear during part removal from FS.
/// In case of such exception we have to change state of failed parts from Deleting to Outdated.
/// Otherwise nobody will try to remove them again (see grabOldParts).
NameSet parts_failed_to_delete;
tryClearPartsFromFilesystem(parts_to_delete_only_from_filesystem, parts_failed_to_delete);
LOG_DEBUG(log, "Removed {} old duplicate parts", parts_to_delete_only_from_filesystem.size());
DataPartsVector finally_remove_parts;
if (!parts_failed_to_delete.empty())
{
DataPartsVector rollback_parts;
for (const auto & part : parts_to_delete_only_from_filesystem)
{
if (!parts_failed_to_delete.contains(part->name))
finally_remove_parts.push_back(part);
else
rollback_parts.push_back(part);
}
if (!rollback_parts.empty())
rollbackDeletingParts(rollback_parts);
}
else
{
finally_remove_parts = parts_to_delete_only_from_filesystem;
}
removePartsFinally(finally_remove_parts);
LOG_DEBUG(log, "Removed {} old duplicate parts", finally_remove_parts.size());
}
/// Delete normal parts from ZooKeeper
@ -6175,13 +6202,40 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
LOG_DEBUG(log, "Will retry deletion of {} parts in the next time", parts_to_retry_deletion.size());
}
/// Remove parts from filesystem and finally from data_parts
if (!parts_to_remove_from_filesystem.empty())
{
clearPartsFromFilesystem(parts_to_remove_from_filesystem);
removePartsFinally(parts_to_remove_from_filesystem);
/// It can happen that some error appear during part removal from FS.
/// In case of such exception we have to change state of failed parts from Deleting to Outdated.
/// Otherwise nobody will try to remove them again (see grabOldParts).
NameSet parts_failed_to_delete;
tryClearPartsFromFilesystem(parts_to_remove_from_filesystem, parts_failed_to_delete);
LOG_DEBUG(log, "Removed {} old parts", parts_to_remove_from_filesystem.size());
DataPartsVector finally_remove_parts;
if (!parts_failed_to_delete.empty())
{
DataPartsVector rollback_parts;
for (const auto & part : parts_to_remove_from_filesystem)
{
if (!parts_failed_to_delete.contains(part->name))
finally_remove_parts.push_back(part);
else
rollback_parts.push_back(part);
}
if (!rollback_parts.empty())
rollbackDeletingParts(rollback_parts);
}
else
{
finally_remove_parts = parts_to_delete_only_from_filesystem;
}
removePartsFinally(finally_remove_parts);
LOG_DEBUG(log, "Removed {} old parts", finally_remove_parts.size());
}
}