Some trash implementation

This commit is contained in:
alesapin 2022-04-15 18:36:23 +02:00
parent 5a8419a48e
commit 1706ae9e15
6 changed files with 89 additions and 36 deletions

View File

@ -504,8 +504,8 @@ void IMergeTreeDataPart::removeIfNeeded()
if (parent_part)
{
bool keep_shared_data = keepSharedDataInDecoupledStorage();
projectionRemove(parent_part->getFullRelativePath(), keep_shared_data);
auto [can_remove, _] = canRemovePart();
projectionRemove(parent_part->getFullRelativePath(), !can_remove);
}
else
remove();
@ -1526,13 +1526,13 @@ catch (...)
throw;
}
bool IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
std::pair<bool, std::vector<std::string>> IMergeTreeDataPart::canRemovePart() const
{
/// NOTE: It's needed for zero-copy replication
if (force_keep_shared_data)
return true;
return std::make_pair(false, std::vector<std::string>{});
return !storage.unlockSharedData(*this);
return storage.unlockSharedData(*this);
}
void IMergeTreeDataPart::initializePartMetadataManager()
@ -1552,7 +1552,7 @@ void IMergeTreeDataPart::remove() const
assert(assertHasValidVersionMetadata());
part_is_probably_removed_from_disk = true;
bool keep_shared_data = keepSharedDataInDecoupledStorage();
auto [can_remove, files_not_to_remove] = canRemovePart();
if (!isStoredOnDisk())
return;
@ -1563,7 +1563,7 @@ void IMergeTreeDataPart::remove() const
if (isProjectionPart())
{
LOG_WARNING(storage.log, "Projection part {} should be removed by its parent {}.", name, parent_part->name);
projectionRemove(parent_part->getFullRelativePath(), keep_shared_data);
projectionRemove(parent_part->getFullRelativePath(), !can_remove);
return;
}
@ -1595,7 +1595,7 @@ void IMergeTreeDataPart::remove() const
LOG_WARNING(storage.log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
try
{
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
disk->removeSharedRecursive(fs::path(to) / "", !can_remove);
}
catch (...)
{
@ -1622,7 +1622,7 @@ void IMergeTreeDataPart::remove() const
std::unordered_set<String> projection_directories;
for (const auto & [p_name, projection_part] : projection_parts)
{
projection_part->projectionRemove(to, keep_shared_data);
projection_part->projectionRemove(to, !can_remove);
projection_directories.emplace(p_name + ".proj");
}
@ -1630,7 +1630,7 @@ void IMergeTreeDataPart::remove() const
if (checksums.empty())
{
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
disk->removeSharedRecursive(fs::path(to) / "", !can_remove);
}
else
{
@ -1659,7 +1659,7 @@ void IMergeTreeDataPart::remove() const
request.emplace_back(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, true);
request.emplace_back(fs::path(to) / TXN_VERSION_METADATA_FILE_NAME, true);
disk->removeSharedFiles(request, keep_shared_data);
disk->removeSharedFiles(request, !can_remove);
disk->removeDirectory(to);
}
catch (...)
@ -1668,7 +1668,7 @@ void IMergeTreeDataPart::remove() const
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
disk->removeSharedRecursive(fs::path(to) / "", !can_remove);
}
}
}

View File

@ -520,7 +520,7 @@ protected:
String getRelativePathForDetachedPart(const String & prefix) const;
bool keepSharedDataInDecoupledStorage() const;
std::pair<bool, std::vector<std::string>> canRemovePart() const;
void initializePartMetadataManager();

View File

@ -1575,6 +1575,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
if (!lock.try_lock())
return res;
bool need_remove_parts_in_order = supportsReplication() && getSettings()->allow_remote_fs_zero_copy_replication;
time_t now = time(nullptr);
std::vector<DataPartIteratorByStateAndInfo> parts_to_delete;
@ -1588,13 +1589,22 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
/// Do not remove outdated part if it may be visible for some transaction
if (!part->version.canBeRemoved())
{
if (need_remove_parts_in_order)
break;
continue;
}
auto part_remove_time = part->remove_time.load(std::memory_order_relaxed);
/// Grab only parts that are not used by anyone (SELECTs for example).
if (!part.unique())
{
if (need_remove_parts_in_order)
break;
continue;
}
if ((part_remove_time < now && now - part_remove_time > getSettings()->old_parts_lifetime.totalSeconds())
|| force
@ -1603,6 +1613,8 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
{
parts_to_delete.emplace_back(it);
}
else if (need_remove_parts_in_order)
break;
}
res.reserve(parts_to_delete.size());
@ -3328,7 +3340,6 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
/// We do not check allow_remote_fs_zero_copy_replication here because data may be shared
/// when allow_remote_fs_zero_copy_replication turned on and off again
original_active_part->force_keep_shared_data = false;
if (original_active_part->volume->getDisk()->supportZeroCopyReplication() &&
@ -5724,6 +5735,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
if (disk->exists(dst_part_path))
throw Exception("Part in " + fullPath(disk, dst_part_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto dst_data_part = createPart(dst_part_name, dst_part_info, single_disk_volume, tmp_dst_part_name);
/// If source part is in memory, flush it to disk and clone it already in on-disk format
if (auto src_part_in_memory = asInMemoryPart(src_part))
{
@ -5732,14 +5747,22 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
src_part_in_memory->flushToDisk(src_relative_data_path, flushed_part_path, metadata_snapshot);
src_part_path = fs::path(src_relative_data_path) / flushed_part_path / "";
}
else
{
dst_data_part->hardlinked_files.source_part_name = src_part->name;
for (auto it = disk->iterateDirectory(src_part_path); it->isValid(); it->next())
{
if (it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME && it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
dst_data_part->hardlinked_files.hardlinks_from_source_part.push_back(it->name());
}
}
LOG_DEBUG(log, "Cloning part {} to {}", fullPath(disk, src_part_path), fullPath(disk, dst_part_path));
localBackup(disk, src_part_path, dst_part_path, /* make_source_readonly */ false);
disk->removeFileIfExists(fs::path(dst_part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME);
disk->removeFileIfExists(fs::path(dst_part_path) / IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME);
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto dst_data_part = createPart(dst_part_name, dst_part_info, single_disk_volume, tmp_dst_part_name);
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;

View File

@ -943,10 +943,7 @@ public:
/// Unlock shared data part in zookeeper
/// Overridden in StorageReplicatedMergeTree
virtual bool unlockSharedData(const IMergeTreeDataPart &) const { return true; }
/// Remove lock with old name for shared data part after rename
virtual bool unlockSharedData(const IMergeTreeDataPart &, const String &) const { return true; }
virtual std::pair<bool, std::vector<std::string>> unlockSharedData(const IMergeTreeDataPart &) const { return std::make_pair(true, std::vector<std::string>{}); }
/// Fetch part only if some replica has it on shared storage like S3
/// Overridden in StorageReplicatedMergeTree

View File

@ -75,6 +75,7 @@
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string.hpp>
#include <algorithm>
#include <ctime>
@ -7345,7 +7346,16 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(
*getSettings(), disk->getType(), getTableSharedID(), part.name, zookeeper_path);
*getSettings(), disk->getType(), getTableSharedID(),
part.name, zookeeper_path);
String path_to_set;
if (!part.hardlinked_files.hardlinks_from_source_part.empty())
{
path_to_set = getZeroCopyPartPath(
*getSettings(), disk->getType(), getTableSharedID(),
part.hardlinked_files.source_part_name, zookeeper_path)[0];
}
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
@ -7353,18 +7363,20 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
LOG_TRACE(log, "Set zookeeper persistent lock {}", zookeeper_node);
createZeroCopyLockNode(zookeeper, zookeeper_node, zkutil::CreateMode::Persistent, replace_existing_lock);
createZeroCopyLockNode(
zookeeper, zookeeper_node, zkutil::CreateMode::Persistent,
replace_existing_lock, path_to_set, part.hardlinked_files.hardlinks_from_source_part);
}
}
bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
std::pair<bool, std::vector<std::string>> StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
{
if (!part.volume || !part.isStoredOnDisk())
return true;
return std::make_pair(true, std::vector<std::string>{});
DiskPtr disk = part.volume->getDisk();
if (!disk || !disk->supportZeroCopyReplication())
return true;
return std::make_pair(true, std::vector<std::string>{});
/// If part is temporary refcount file may be absent
auto ref_count_path = fs::path(part.getFullRelativePath()) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
@ -7372,20 +7384,20 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
{
auto ref_count = disk->getRefCount(ref_count_path);
if (ref_count > 0) /// Keep part shard info for frozen backups
return false;
return std::make_pair(false, std::vector<std::string>{});
}
else
{
/// Temporary part with some absent file cannot be locked in shared mode
return true;
return std::make_pair(true, std::vector<std::string>{});
}
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), part.name, replica_name, disk, getZooKeeper(), *getSettings(), log,
zookeeper_path);
}
bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name,
std::pair<bool, std::vector<std::string>> StorageReplicatedMergeTree::unlockSharedDataByID(
String part_id, const String & table_uuid, const String & part_name,
const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings,
Poco::Logger * logger, const String & zookeeper_path_old)
{
@ -7394,9 +7406,13 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const Stri
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old);
bool part_has_no_more_locks = true;
std::vector<std::string> files_not_to_remove;
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
files_not_to_remove.clear();
auto content = zookeeper_ptr->get(zc_zookeeper_path);
boost::split(files_not_to_remove, content, boost::is_any_of("\n "));
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / part_id;
/// Delete our replica node for part from zookeeper (we are not interested in it anymore)
@ -7442,7 +7458,7 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const Stri
}
}
return part_has_no_more_locks;
return std::make_pair(part_has_no_more_locks, files_not_to_remove);
}
@ -7832,7 +7848,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
}
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode, bool replace_existing_lock)
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode, bool replace_existing_lock, const std::string & path_to_set, const std::vector<std::string> & hardlinked_files)
{
/// In rare case other replica can remove path between createAncestors and createIfNotExists
/// So we make up to 5 attempts
@ -7852,6 +7868,11 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperP
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
if (!path_to_set.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
ops.emplace_back(zkutil::makeSetRequest(path_to_set, data, -1));
}
Coordination::Responses responses;
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK)
@ -7859,7 +7880,16 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperP
}
else
{
auto error = zookeeper->tryCreate(zookeeper_node, "", mode);
Coordination::Requests ops;
if (!path_to_set.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
ops.emplace_back(zkutil::makeSetRequest(path_to_set, data, -1));
}
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
Coordination::Responses responses;
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK || error == Coordination::Error::ZNODEEXISTS)
break;
}
@ -8002,9 +8032,12 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
if (disk->getRefCount(checksums) == 0)
{
String id = disk->getUniqueId(checksums);
keep_shared = !StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
bool can_remove = false;
std::tie(can_remove, std::ignore) = StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
detached_zookeeper_path);
keep_shared = !can_remove;
}
else
keep_shared = true;

View File

@ -236,12 +236,12 @@ public:
/// Unlock shared data part in zookeeper
/// Return true if data unlocked
/// Return false if data is still used by another node
bool unlockSharedData(const IMergeTreeDataPart & part) const override;
std::pair<bool, std::vector<std::string>> unlockSharedData(const IMergeTreeDataPart & part) const override;
/// Unlock shared data part in zookeeper by part id
/// Return true if data unlocked
/// Return false if data is still used by another node
static bool unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
static std::pair<bool, std::vector<std::string>> unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
const String & zookeeper_path_old);
@ -756,7 +756,7 @@ private:
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old);
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false);
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, const std::string & path_to_set = "", const std::vector<std::string> & hardlinked_files = {});
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;