Some code

This commit is contained in:
alesapin 2022-04-19 01:09:09 +02:00
parent a97754f462
commit bd7b3847c1
22 changed files with 148 additions and 82 deletions

View File

@ -309,23 +309,26 @@ void DiskCacheWrapper::removeSharedFile(const String & path, bool keep_s3)
DiskDecorator::removeSharedFile(path, keep_s3);
}
void DiskCacheWrapper::removeSharedRecursive(const String & path, bool keep_s3)
void DiskCacheWrapper::removeSharedRecursive(const String & path, bool keep_all, const NameSet & files_to_keep)
{
if (cache_disk->exists(path))
cache_disk->removeSharedRecursive(path, keep_s3);
DiskDecorator::removeSharedRecursive(path, keep_s3);
cache_disk->removeSharedRecursive(path, keep_all, files_to_keep);
DiskDecorator::removeSharedRecursive(path, keep_all, files_to_keep);
}
void DiskCacheWrapper::removeSharedFiles(const RemoveBatchRequest & files, bool keep_s3)
void DiskCacheWrapper::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all, const NameSet & files_to_keep)
{
for (const auto & file : files)
{
if (cache_disk->exists(file.path))
cache_disk->removeSharedFile(file.path, keep_s3);
{
bool keep_file = keep_all || files_to_keep.contains(fs::path(file.path).filename());
cache_disk->removeSharedFile(file.path, keep_file);
}
}
DiskDecorator::removeSharedFiles(files, keep_s3);
DiskDecorator::removeSharedFiles(files, keep_all, files_to_keep);
}
void DiskCacheWrapper::createHardLink(const String & src_path, const String & dst_path)

View File

@ -47,8 +47,9 @@ public:
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_all, const NameSet & files_to_keep) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all, const NameSet & files_to_keep) override;
void createHardLink(const String & src_path, const String & dst_path) override;
ReservationPtr reserve(UInt64 bytes) override;

View File

@ -151,14 +151,14 @@ void DiskDecorator::removeSharedFile(const String & path, bool keep_s3)
delegate->removeSharedFile(path, keep_s3);
}
void DiskDecorator::removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs)
void DiskDecorator::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only)
{
delegate->removeSharedFiles(files, keep_in_remote_fs);
delegate->removeSharedFiles(files, keep_all_batch_metadata, file_names_remove_metadata_only);
}
void DiskDecorator::removeSharedRecursive(const String & path, bool keep_s3)
void DiskDecorator::removeSharedRecursive(const String & path, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only)
{
delegate->removeSharedRecursive(path, keep_s3);
delegate->removeSharedRecursive(path, keep_all_batch_metadata, file_names_remove_metadata_only);
}
void DiskDecorator::setLastModified(const String & path, const Poco::Timestamp & timestamp)

View File

@ -52,8 +52,8 @@ public:
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs) override;
void removeSharedRecursive(const String & path, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void setReadOnly(const String & path) override;

View File

@ -159,10 +159,23 @@ public:
delegate->removeSharedFile(wrapped_path, flag);
}
void removeSharedRecursive(const String & path, bool flag) override
void removeSharedRecursive(const String & path, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only) override
{
auto wrapped_path = wrappedPath(path);
delegate->removeSharedRecursive(wrapped_path, flag);
delegate->removeSharedRecursive(wrapped_path, keep_all_batch_metadata, file_names_remove_metadata_only);
}
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only) override
{
for (const auto & file : files)
{
auto wrapped_path = wrappedPath(file.path);
bool keep = keep_all_batch_metadata || file_names_remove_metadata_only.contains(fs::path(file.path).filename());
if (file.if_exists)
delegate->removeSharedFileIfExists(wrapped_path, keep);
else
delegate->removeSharedFile(wrapped_path, keep);
}
}
void removeSharedFileIfExists(const String & path, bool flag) override

View File

@ -251,16 +251,16 @@ void DiskRestartProxy::removeSharedFile(const String & path, bool keep_s3)
DiskDecorator::removeSharedFile(path, keep_s3);
}
void DiskRestartProxy::removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs)
void DiskRestartProxy::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedFiles(files, keep_in_remote_fs);
DiskDecorator::removeSharedFiles(files, keep_all_batch_metadata, file_names_remove_metadata_only);
}
void DiskRestartProxy::removeSharedRecursive(const String & path, bool keep_s3)
void DiskRestartProxy::removeSharedRecursive(const String & path, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only)
{
ReadLock lock (mutex);
DiskDecorator::removeSharedRecursive(path, keep_s3);
DiskDecorator::removeSharedRecursive(path, keep_all_batch_metadata, file_names_remove_metadata_only);
}
void DiskRestartProxy::setLastModified(const String & path, const Poco::Timestamp & timestamp)

View File

@ -54,8 +54,8 @@ public:
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only) override;
void removeSharedRecursive(const String & path, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void setReadOnly(const String & path) override;

View File

@ -139,7 +139,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
void removeSharedRecursive(const String &, bool) override
void removeSharedRecursive(const String &, bool, const NameSet &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}

View File

@ -197,7 +197,7 @@ public:
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
/// Differs from removeRecursive for S3/HDFS disks
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
virtual void removeSharedRecursive(const String & path, bool) { removeRecursive(path); }
virtual void removeSharedRecursive(const String & path, bool, const NameSet &) { removeRecursive(path); }
/// Remove file or directory if it exists.
/// Differs from removeFileIfExists for S3/HDFS disks
@ -237,14 +237,15 @@ public:
/// Batch request to remove multiple files.
/// May be much faster for blob storage.
virtual void removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs)
virtual void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only)
{
for (const auto & file : files)
{
bool keep_file = keep_all_batch_metadata || file_names_remove_metadata_only.contains(fs::path(file.path).filename());
if (file.if_exists)
removeSharedFileIfExists(file.path, keep_in_remote_fs);
removeSharedFileIfExists(file.path, keep_file);
else
removeSharedFile(file.path, keep_in_remote_fs);
removeSharedFile(file.path, keep_file);
}
}

View File

@ -271,7 +271,7 @@ std::unordered_map<String, String> IDiskRemote::getSerializedMetadata(const std:
return metadatas;
}
void IDiskRemote::removeMetadata(const String & path, std::vector<std::string> & paths_to_remove)
void IDiskRemote::removeMetadata(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove)
{
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));
@ -283,13 +283,13 @@ void IDiskRemote::removeMetadata(const String & path, std::vector<std::string> &
try
{
auto metadata_updater = [&paths_to_remove, this] (Metadata & metadata)
auto metadata_updater = [&paths_to_remove, path, this] (Metadata & metadata)
{
if (metadata.ref_count == 0)
{
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
{
paths_to_remove.push_back(remote_fs_root_path + remote_fs_object_path);
paths_to_remove[path].push_back(remote_fs_root_path + remote_fs_object_path);
if (cache)
{
@ -328,7 +328,7 @@ void IDiskRemote::removeMetadata(const String & path, std::vector<std::string> &
}
void IDiskRemote::removeMetadataRecursive(const String & path, std::vector<String> & paths_to_remove)
void IDiskRemote::removeMetadataRecursive(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
@ -483,27 +483,27 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
void IDiskRemote::removeSharedFile(const String & path, bool delete_metadata_only)
{
std::vector<String> paths_to_remove;
std::unordered_map<String, std::vector<String>> paths_to_remove;
removeMetadata(path, paths_to_remove);
if (!delete_metadata_only)
removeFromRemoteFS(paths_to_remove);
removeFromRemoteFS(paths_to_remove[path]);
}
void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
{
std::vector<String> paths_to_remove;
std::unordered_map<String, std::vector<String>> paths_to_remove;
if (metadata_disk->exists(path))
{
removeMetadata(path, paths_to_remove);
if (!delete_metadata_only)
removeFromRemoteFS(paths_to_remove);
removeFromRemoteFS(paths_to_remove[path]);
}
}
void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool delete_metadata_only)
void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only)
{
std::vector<String> paths_to_remove;
std::unordered_map<String, std::vector<String>> paths_to_remove;
for (const auto & file : files)
{
bool skip = file.if_exists && !metadata_disk->exists(file.path);
@ -511,17 +511,33 @@ void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool delet
removeMetadata(file.path, paths_to_remove);
}
if (!delete_metadata_only)
removeFromRemoteFS(paths_to_remove);
if (!keep_all_batch_metadata)
{
std::vector<String> remove_from_remote;
for (auto && [path, remote_paths] : paths_to_remove)
{
if (!file_names_remove_metadata_only.contains(fs::path(path).filename()))
remove_from_remote.insert(remove_from_remote.end(), remote_paths.begin(), remote_paths.end());
}
removeFromRemoteFS(remove_from_remote);
}
}
void IDiskRemote::removeSharedRecursive(const String & path, bool delete_metadata_only)
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only)
{
std::vector<String> paths_to_remove;
std::unordered_map<String, std::vector<String>> paths_to_remove;
removeMetadataRecursive(path, paths_to_remove);
if (!delete_metadata_only)
removeFromRemoteFS(paths_to_remove);
if (!keep_all_batch_metadata)
{
std::vector<String> remove_from_remote;
for (auto && [local_path, remote_paths] : paths_to_remove)
{
if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename()))
remove_from_remote.insert(remove_from_remote.end(), remote_paths.begin(), remote_paths.end());
}
removeFromRemoteFS(remove_from_remote);
}
}

View File

@ -107,15 +107,16 @@ public:
void removeFileIfExists(const String & path) override { removeSharedFileIfExists(path, false); }
void removeRecursive(const String & path) override { removeSharedRecursive(path, false); }
void removeRecursive(const String & path) override { removeSharedRecursive(path, false, {}); }
void removeSharedFile(const String & path, bool delete_metadata_only) override;
void removeSharedFileIfExists(const String & path, bool delete_metadata_only) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool delete_metadata_only) override;
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only) override;
void removeSharedRecursive(const String & path, bool delete_metadata_only) override;
void removeSharedRecursive(const String & path, bool keep_all_batch_metadata, const NameSet & file_names_remove_metadata_only) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
@ -170,9 +171,9 @@ protected:
FileCachePtr cache;
private:
void removeMetadata(const String & path, std::vector<String> & paths_to_remove);
void removeMetadata(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove);
void removeMetadataRecursive(const String & path, std::vector<String> & paths_to_remove);
void removeMetadataRecursive(const String & path, std::unordered_map<String, std::vector<String>> & paths_to_remove);
bool tryReserve(UInt64 bytes);

View File

@ -764,7 +764,7 @@ void DiskS3::restore()
bool cleanup_s3 = information.source_bucket != bucket || information.source_path != remote_fs_root_path;
for (const auto & root : data_roots)
if (exists(root))
removeSharedRecursive(root + '/', !cleanup_s3);
removeSharedRecursive(root + '/', !cleanup_s3, {});
restoreFiles(information);
restoreFileOperations(information);

View File

@ -826,7 +826,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
/// performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).
disk->removeSharedRecursive(part_download_path, true);
disk->removeSharedRecursive(part_download_path, true, {});
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
}

View File

@ -1526,11 +1526,11 @@ catch (...)
throw;
}
std::pair<bool, std::vector<std::string>> IMergeTreeDataPart::canRemovePart() const
std::pair<bool, NameSet> IMergeTreeDataPart::canRemovePart() const
{
/// NOTE: It's needed for zero-copy replication
if (force_keep_shared_data)
return std::make_pair(false, std::vector<std::string>{});
return std::make_pair(false, NameSet{});
return storage.unlockSharedData(*this);
}
@ -1595,7 +1595,7 @@ void IMergeTreeDataPart::remove() const
LOG_WARNING(storage.log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
try
{
disk->removeSharedRecursive(fs::path(to) / "", !can_remove);
disk->removeSharedRecursive(fs::path(to) / "", !can_remove, files_not_to_remove);
}
catch (...)
{
@ -1630,7 +1630,7 @@ void IMergeTreeDataPart::remove() const
if (checksums.empty())
{
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(to) / "", !can_remove);
disk->removeSharedRecursive(fs::path(to) / "", !can_remove, files_not_to_remove);
}
else
{
@ -1659,7 +1659,7 @@ void IMergeTreeDataPart::remove() const
request.emplace_back(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, true);
request.emplace_back(fs::path(to) / TXN_VERSION_METADATA_FILE_NAME, true);
disk->removeSharedFiles(request, !can_remove);
disk->removeSharedFiles(request, !can_remove, files_not_to_remove);
disk->removeDirectory(to);
}
catch (...)
@ -1668,7 +1668,7 @@ void IMergeTreeDataPart::remove() const
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
disk->removeSharedRecursive(fs::path(to) / "", !can_remove);
disk->removeSharedRecursive(fs::path(to) / "", !can_remove, files_not_to_remove);
}
}
}
@ -1689,7 +1689,7 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_sh
"Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: checksums.txt is missing",
fullPath(disk, to));
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data, {});
}
else
{
@ -1713,8 +1713,8 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_sh
request.emplace_back(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, true);
request.emplace_back(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, true);
disk->removeSharedFiles(request, keep_shared_data);
disk->removeSharedRecursive(to, keep_shared_data);
disk->removeSharedFiles(request, keep_shared_data, {});
disk->removeSharedRecursive(to, keep_shared_data, {});
}
catch (...)
{
@ -1722,7 +1722,7 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_sh
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data, {});
}
}
}

View File

@ -334,7 +334,7 @@ public:
struct HardlinkedFiles
{
std::string source_part_name;
std::vector<std::string> hardlinks_from_source_part;
NameSet hardlinks_from_source_part;
};
HardlinkedFiles hardlinked_files;
@ -520,7 +520,7 @@ protected:
String getRelativePathForDetachedPart(const String & prefix) const;
std::pair<bool, std::vector<std::string>> canRemovePart() const;
std::pair<bool, NameSet> canRemovePart() const;
void initializePartMetadataManager();

View File

@ -5762,7 +5762,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
for (auto it = disk->iterateDirectory(src_part_path); it->isValid(); it->next())
{
if (it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME && it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
dst_data_part->hardlinked_files.hardlinks_from_source_part.push_back(it->name());
dst_data_part->hardlinked_files.hardlinks_from_source_part.insert(it->name());
}
}

View File

@ -943,7 +943,7 @@ public:
/// Unlock shared data part in zookeeper
/// Overridden in StorageReplicatedMergeTree
virtual std::pair<bool, std::vector<std::string>> unlockSharedData(const IMergeTreeDataPart &) const { return std::make_pair(true, std::vector<std::string>{}); }
virtual std::pair<bool, NameSet> unlockSharedData(const IMergeTreeDataPart &) const { return std::make_pair(true, NameSet{}); }
/// Fetch part only if some replica has it on shared storage like S3
/// Overridden in StorageReplicatedMergeTree

View File

@ -1070,7 +1070,7 @@ private:
ctx->new_data_part->version.setCreationTID(tid, nullptr);
ctx->new_data_part->storeVersionMetadata();
std::vector<std::string> hardlinked_files;
NameSet hardlinked_files;
/// Create hardlinks for unchanged files
for (auto it = ctx->disk->iterateDirectory(ctx->source_part->getFullRelativePath()); it->isValid(); it->next())
{
@ -1101,7 +1101,7 @@ private:
if (!ctx->disk->isDirectory(it->path()))
{
ctx->disk->createHardLink(it->path(), destination);
hardlinked_files.push_back(it->name());
hardlinked_files.insert(it->name());
}
else if (!endsWith(".tmp_proj", it->name())) // ignore projection tmp merge dir
@ -1112,7 +1112,7 @@ private:
{
String p_destination = fs::path(destination) / p_it->name();
ctx->disk->createHardLink(p_it->path(), p_destination);
hardlinked_files.push_back(p_it->name());
hardlinked_files.insert(p_it->name());
}
}
}

View File

@ -7369,14 +7369,14 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
}
}
std::pair<bool, std::vector<std::string>> StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
{
if (!part.volume || !part.isStoredOnDisk())
return std::make_pair(true, std::vector<std::string>{});
return std::make_pair(true, NameSet{});
DiskPtr disk = part.volume->getDisk();
if (!disk || !disk->supportZeroCopyReplication())
return std::make_pair(true, std::vector<std::string>{});
return std::make_pair(true, NameSet{});
/// If part is temporary refcount file may be absent
auto ref_count_path = fs::path(part.getFullRelativePath()) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
@ -7384,19 +7384,19 @@ std::pair<bool, std::vector<std::string>> StorageReplicatedMergeTree::unlockShar
{
auto ref_count = disk->getRefCount(ref_count_path);
if (ref_count > 0) /// Keep part shard info for frozen backups
return std::make_pair(false, std::vector<std::string>{});
return std::make_pair(false, NameSet{});
}
else
{
/// Temporary part with some absent file cannot be locked in shared mode
return std::make_pair(true, std::vector<std::string>{});
return std::make_pair(true, NameSet{});
}
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), part.name, replica_name, disk, getZooKeeper(), *getSettings(), log,
zookeeper_path);
}
std::pair<bool, std::vector<std::string>> StorageReplicatedMergeTree::unlockSharedDataByID(
std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
String part_id, const String & table_uuid, const String & part_name,
const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings,
Poco::Logger * logger, const String & zookeeper_path_old)
@ -7406,13 +7406,14 @@ std::pair<bool, std::vector<std::string>> StorageReplicatedMergeTree::unlockShar
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old);
bool part_has_no_more_locks = true;
std::vector<std::string> files_not_to_remove;
NameSet files_not_to_remove;
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
files_not_to_remove.clear();
auto content = zookeeper_ptr->get(zc_zookeeper_path);
boost::split(files_not_to_remove, content, boost::is_any_of("\n "));
auto files_not_to_remove_str = zookeeper_ptr->get(zc_zookeeper_path);
boost::split(files_not_to_remove, files_not_to_remove_str, boost::is_any_of("\n "));
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / part_id;
/// Delete our replica node for part from zookeeper (we are not interested in it anymore)
@ -7848,7 +7849,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
}
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode, bool replace_existing_lock, const std::string & path_to_set, const std::vector<std::string> & hardlinked_files)
void StorageReplicatedMergeTree::createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode, bool replace_existing_lock, const std::string & path_to_set, const NameSet & hardlinked_files)
{
/// In rare case other replica can remove path between createAncestors and createIfNotExists
/// So we make up to 5 attempts
@ -8025,6 +8026,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
bool keep_shared = false;
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
NameSet files_not_to_remove;
fs::path checksums = fs::path(path) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
if (disk->exists(checksums))
@ -8033,7 +8035,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
{
String id = disk->getUniqueId(checksums);
bool can_remove = false;
std::tie(can_remove, std::ignore) = StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
std::tie(can_remove, files_not_to_remove) = StorageReplicatedMergeTree::unlockSharedDataByID(id, table_uuid, part_name,
detached_replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
detached_zookeeper_path);
@ -8043,7 +8045,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
keep_shared = true;
}
disk->removeSharedRecursive(path, keep_shared);
disk->removeSharedRecursive(path, keep_shared, files_not_to_remove);
return keep_shared;
}

View File

@ -236,12 +236,12 @@ public:
/// Unlock shared data part in zookeeper
/// Return true if data unlocked
/// Return false if data is still used by another node
std::pair<bool, std::vector<std::string>> unlockSharedData(const IMergeTreeDataPart & part) const override;
std::pair<bool, NameSet> unlockSharedData(const IMergeTreeDataPart & part) const override;
/// Unlock shared data part in zookeeper by part id
/// Return true if data unlocked
/// Return false if data is still used by another node
static std::pair<bool, std::vector<std::string>> unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
static std::pair<bool, NameSet> unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
const String & zookeeper_path_old);
@ -756,7 +756,7 @@ private:
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, DiskType disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old);
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, const std::string & path_to_set = "", const std::vector<std::string> & hardlinked_files = {});
static void createZeroCopyLockNode(const zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_node, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, const std::string & path_to_set = "", const NameSet & hardlinked_files = {});
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;

View File

@ -0,0 +1,3 @@
10000
10000
10000

View File

@ -0,0 +1,26 @@
drop table if exists partslost_0;
drop table if exists partslost_1;
drop table if exists partslost_2;
CREATE TABLE partslost_0 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/partslost/0', '0') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
CREATE TABLE partslost_1 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/partslost/0', '1') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
CREATE TABLE partslost_2 (x String) ENGINE=ReplicatedMergeTree('/clickhouse/table/partslost/0', '2') ORDER BY tuple() SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 1;
INSERT INTO partslost_0 SELECT toString(number) AS x from system.numbers LIMIT 10000;
ALTER TABLE partslost_0 ADD INDEX idx x TYPE tokenbf_v1(285000, 3, 12345) GRANULARITY 3;
ALTER TABLE partslost_0 MATERIALIZE INDEX idx;
select sleep(3) FORMAT Null;
select sleep(3) FORMAT Null;
select sleep(3) FORMAT Null;
select sleep(3) FORMAT Null;
ALTER TABLE partslost_0 DROP INDEX idx;
select count() from partslost_0;
select count() from partslost_1;
select count() from partslost_2;