Remove more trash

This commit is contained in:
alesapin 2022-04-15 17:05:17 +02:00
parent eb7593f786
commit 5a8419a48e
5 changed files with 35 additions and 28 deletions

View File

@ -504,10 +504,8 @@ void IMergeTreeDataPart::removeIfNeeded()
if (parent_part)
{
std::optional<bool> keep_shared_data = keepSharedDataInDecoupledStorage();
if (!keep_shared_data.has_value())
return;
projectionRemove(parent_part->getFullRelativePath(), *keep_shared_data);
bool keep_shared_data = keepSharedDataInDecoupledStorage();
projectionRemove(parent_part->getFullRelativePath(), keep_shared_data);
}
else
remove();
@ -1528,7 +1526,7 @@ catch (...)
throw;
}
std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
bool IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
{
/// NOTE: It's needed for zero-copy replication
if (force_keep_shared_data)
@ -1554,9 +1552,7 @@ void IMergeTreeDataPart::remove() const
assert(assertHasValidVersionMetadata());
part_is_probably_removed_from_disk = true;
std::optional<bool> keep_shared_data = keepSharedDataInDecoupledStorage();
if (!keep_shared_data.has_value())
return;
bool keep_shared_data = keepSharedDataInDecoupledStorage();
if (!isStoredOnDisk())
return;
@ -1567,7 +1563,7 @@ void IMergeTreeDataPart::remove() const
if (isProjectionPart())
{
LOG_WARNING(storage.log, "Projection part {} should be removed by its parent {}.", name, parent_part->name);
projectionRemove(parent_part->getFullRelativePath(), *keep_shared_data);
projectionRemove(parent_part->getFullRelativePath(), keep_shared_data);
return;
}
@ -1599,7 +1595,7 @@ void IMergeTreeDataPart::remove() const
LOG_WARNING(storage.log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
try
{
disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
}
catch (...)
{
@ -1626,7 +1622,7 @@ void IMergeTreeDataPart::remove() const
std::unordered_set<String> projection_directories;
for (const auto & [p_name, projection_part] : projection_parts)
{
projection_part->projectionRemove(to, *keep_shared_data);
projection_part->projectionRemove(to, keep_shared_data);
projection_directories.emplace(p_name + ".proj");
}
@ -1634,7 +1630,7 @@ void IMergeTreeDataPart::remove() const
if (checksums.empty())
{
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
}
else
{
@ -1663,7 +1659,7 @@ void IMergeTreeDataPart::remove() const
request.emplace_back(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, true);
request.emplace_back(fs::path(to) / TXN_VERSION_METADATA_FILE_NAME, true);
disk->removeSharedFiles(request, *keep_shared_data);
disk->removeSharedFiles(request, keep_shared_data);
disk->removeDirectory(to);
}
catch (...)
@ -1672,7 +1668,7 @@ void IMergeTreeDataPart::remove() const
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
disk->removeSharedRecursive(fs::path(to) / "", *keep_shared_data);
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
}
}
}

View File

@ -331,6 +331,14 @@ public:
mutable VersionMetadata version;
struct HardlinkedFiles
{
std::string source_part_name;
std::vector<std::string> hardlinks_from_source_part;
};
HardlinkedFiles hardlinked_files;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
@ -512,7 +520,7 @@ protected:
String getRelativePathForDetachedPart(const String & prefix) const;
std::optional<bool> keepSharedDataInDecoupledStorage() const;
bool keepSharedDataInDecoupledStorage() const;
void initializePartMetadataManager();

View File

@ -1070,6 +1070,7 @@ private:
ctx->new_data_part->version.setCreationTID(tid, nullptr);
ctx->new_data_part->storeVersionMetadata();
std::vector<std::string> hardlinked_files;
/// Create hardlinks for unchanged files
for (auto it = ctx->disk->iterateDirectory(ctx->source_part->getFullRelativePath()); it->isValid(); it->next())
{
@ -1083,10 +1084,12 @@ private:
{
return rename_pair.first == file_name;
});
if (rename_it != ctx->files_to_rename.end())
{
if (rename_it->second.empty())
continue;
destination += rename_it->second;
}
else
@ -1094,8 +1097,13 @@ private:
destination += it->name();
}
if (!ctx->disk->isDirectory(it->path()))
{
ctx->disk->createHardLink(it->path(), destination);
hardlinked_files.push_back(it->name());
}
else if (!endsWith(".tmp_proj", it->name())) // ignore projection tmp merge dir
{
// it's a projection part directory
@ -1104,10 +1112,14 @@ private:
{
String p_destination = fs::path(destination) / p_it->name();
ctx->disk->createHardLink(p_it->path(), p_destination);
hardlinked_files.push_back(p_it->name());
}
}
}
ctx->new_data_part->hardlinked_files.source_part_name = ctx->source_part->name;
ctx->new_data_part->hardlinked_files.hardlinks_from_source_part = hardlinked_files;
(*ctx->mutate_entry)->columns_written = ctx->storage_columns.size() - ctx->updated_header.columns();
ctx->new_data_part->checksums = ctx->source_part->checksums;

View File

@ -7344,8 +7344,9 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
String id = part.getUniqueId();
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(),
part.name, zookeeper_path);
Strings zc_zookeeper_paths = getZeroCopyPartPath(
*getSettings(), disk->getType(), getTableSharedID(), part.name, zookeeper_path);
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
@ -7356,14 +7357,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
}
}
bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
{
return unlockSharedData(part, part.name);
}
bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, const String & name) const
{
if (!part.volume || !part.isStoredOnDisk())
return true;
@ -7386,7 +7380,7 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
return true;
}
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), name, replica_name, disk, getZooKeeper(), *getSettings(), log,
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), part.name, replica_name, disk, getZooKeeper(), *getSettings(), log,
zookeeper_path);
}

View File

@ -238,9 +238,6 @@ public:
/// Return false if data is still used by another node
bool unlockSharedData(const IMergeTreeDataPart & part) const override;
/// Remove lock with old name for shared data part after rename
bool unlockSharedData(const IMergeTreeDataPart & part, const String & name) const override;
/// Unlock shared data part in zookeeper by part id
/// Return true if data unlocked
/// Return false if data is still used by another node