Improve handling of old parts

This commit is contained in:
Raúl Marín 2022-12-28 21:22:40 +01:00
parent 6d27e2ddbc
commit 32d1662503

View File

@ -1836,6 +1836,9 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector &
void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts)
{
if (parts.empty())
return;
{
auto lock = lockParts();
@ -1852,12 +1855,12 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
(*it)->assertState({DataPartState::Deleting});
LOG_DEBUG(log, "Finally removing part from memory {}", part->name);
data_parts_indexes.erase(it);
}
}
LOG_DEBUG(log, "Removing {} parts from memory: Parts: [{}]", parts.size(), fmt::join(parts, ", "));
/// Data parts is still alive (since DataPartsVector holds shared_ptrs) and contain useful metainformation for logging
/// NOTE: There is no need to log parts deletion somewhere else, all deleting parts pass through this function and pass away
@ -1910,13 +1913,14 @@ void MergeTreeData::flushAllInMemoryPartsIfNeeded()
size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
{
DataPartsVector parts_to_remove = grabOldParts(force);
if (parts_to_remove.empty())
return 0;
clearPartsFromFilesystem(parts_to_remove);
removePartsFinally(parts_to_remove);
/// This is needed to close files to avoid they reside on disk after being deleted.
/// NOTE: we can drop files from cache more selectively but this is good enough.
if (!parts_to_remove.empty())
getContext()->dropMMappedFileCache();
getContext()->dropMMappedFileCache();
return parts_to_remove.size();
}
@ -1980,7 +1984,8 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
ThreadPool pool(num_threads);
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
LOG_DEBUG(log, "Removing {} parts from filesystem: {} (concurrently)", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
LOG_DEBUG(
log, "Removing {} parts from filesystem (concurrently): Parts: [{}]", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
for (const DataPartPtr & part : parts_to_remove)
{
pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()]
@ -2005,7 +2010,8 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
}
else if (!parts_to_remove.empty())
{
LOG_DEBUG(log, "Removing {} parts from filesystem: {}", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
LOG_DEBUG(
log, "Removing {} parts from filesystem (serially): Parts: [{}]", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
for (const DataPartPtr & part : parts_to_remove)
{
preparePartForRemoval(part)->remove();