Support hardlinking parts transactionally

This commit is contained in:
Michael Kolupaev 2023-05-16 19:27:30 +00:00
parent a2c3de5082
commit e84f0895e7
9 changed files with 76 additions and 28 deletions

View File

@ -411,25 +411,38 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
bool make_source_readonly,
std::function<void(const DiskPtr &)> save_metadata_callback,
bool copy_instead_of_hardlink,
const NameSet & files_to_copy_instead_of_hardlinks) const
const NameSet & files_to_copy_instead_of_hardlinks,
DiskTransactionPtr external_transaction) const
{
auto disk = volume->getDisk();
disk->createDirectories(to);
if (external_transaction)
external_transaction->createDirectories(to);
else
disk->createDirectories(to);
localBackup(disk, getRelativePath(), fs::path(to) / dir_path, make_source_readonly, {}, copy_instead_of_hardlink, files_to_copy_instead_of_hardlinks);
localBackup(disk, getRelativePath(), fs::path(to) / dir_path, make_source_readonly, {}, copy_instead_of_hardlink, files_to_copy_instead_of_hardlinks, external_transaction);
if (save_metadata_callback)
save_metadata_callback(disk);
disk->removeFileIfExists(fs::path(to) / dir_path / "delete-on-destroy.txt");
disk->removeFileIfExists(fs::path(to) / dir_path / "txn_version.txt");
disk->removeFileIfExists(fs::path(to) / dir_path / IMergeTreeDataPart::METADATA_VERSION_FILE_NAME);
if (external_transaction)
{
external_transaction->removeFileIfExists(fs::path(to) / dir_path / "delete-on-destroy.txt");
external_transaction->removeFileIfExists(fs::path(to) / dir_path / "txn_version.txt");
external_transaction->removeFileIfExists(fs::path(to) / dir_path / IMergeTreeDataPart::METADATA_VERSION_FILE_NAME);
}
else
{
disk->removeFileIfExists(fs::path(to) / dir_path / "delete-on-destroy.txt");
disk->removeFileIfExists(fs::path(to) / dir_path / "txn_version.txt");
disk->removeFileIfExists(fs::path(to) / dir_path / IMergeTreeDataPart::METADATA_VERSION_FILE_NAME);
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
/// Do not initialize storage in case of DETACH because part may be broken.
bool to_detached = dir_path.starts_with("detached/");
return create(single_disk_volume, to, dir_path, /*initialize=*/ !to_detached);
return create(single_disk_volume, to, dir_path, /*initialize=*/ !to_detached && !external_transaction);
}
MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(

View File

@ -64,7 +64,8 @@ public:
bool make_source_readonly,
std::function<void(const DiskPtr &)> save_metadata_callback,
bool copy_instead_of_hardlink,
const NameSet & files_to_copy_instead_of_hardlinks) const override;
const NameSet & files_to_copy_instead_of_hardlinks,
DiskTransactionPtr external_transaction) const override;
MutableDataPartStoragePtr clonePart(
const std::string & to,

View File

@ -11,6 +11,7 @@
#include <memory>
#include <optional>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Disks/IDiskTransaction.h>
namespace DB
{
@ -212,13 +213,18 @@ public:
/// implementation which relies on paths of some blobs in S3. For example if we want to hardlink
/// the whole part during mutation we shouldn't hardlink checksums.txt, because otherwise
/// zero-copy locks for different parts will be on the same path in zookeeper.
///
/// If `external_transaction` is provided, the disk operations (creating directories, hardlinking,
/// etc) won't be applied immediately; instead, they'll be added to external_transaction, which the
/// caller then needs to commit.
virtual std::shared_ptr<IDataPartStorage> freeze(
const std::string & to,
const std::string & dir_path,
bool make_source_readonly,
std::function<void(const DiskPtr &)> save_metadata_callback,
bool copy_instead_of_hardlink,
const NameSet & files_to_copy_instead_of_hardlinks) const = 0;
const NameSet & files_to_copy_instead_of_hardlinks,
DiskTransactionPtr external_transaction = nullptr) const = 0;
/// Make a full copy of a data part into 'to/dir_path' (possibly to a different disk).
virtual std::shared_ptr<IDataPartStorage> clonePart(

View File

@ -161,7 +161,7 @@ public:
void remove();
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
/// Load various metadata into memory: checksums from checksums.txt, index if required, etc.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;

View File

@ -17,9 +17,10 @@ namespace
{
void localBackupImpl(
const DiskPtr & disk, const String & source_path,
const DiskPtr & disk, IDiskTransaction * transaction, const String & source_path,
const String & destination_path, bool make_source_readonly, size_t level,
std::optional<size_t> max_level, const NameSet & files_to_copy_instead_of_hardlinks)
std::optional<size_t> max_level, bool copy_instead_of_hardlinks,
const NameSet & files_to_copy_instead_of_hardlinks)
{
if (max_level && level > *max_level)
return;
@ -27,7 +28,10 @@ void localBackupImpl(
if (level >= 1000)
throw DB::Exception(DB::ErrorCodes::TOO_DEEP_RECURSION, "Too deep recursion");
disk->createDirectories(destination_path);
if (transaction)
transaction->createDirectories(destination_path);
else
disk->createDirectories(destination_path);
for (auto it = disk->iterateDirectory(source_path); it->isValid(); it->next())
{
@ -37,15 +41,36 @@ void localBackupImpl(
if (!disk->isDirectory(source))
{
if (make_source_readonly)
disk->setReadOnly(source);
if (files_to_copy_instead_of_hardlinks.contains(it->name()))
disk->copyFile(source, *disk, destination);
{
if (transaction)
transaction->setReadOnly(source);
else
disk->setReadOnly(source);
}
if (copy_instead_of_hardlinks || files_to_copy_instead_of_hardlinks.contains(it->name()))
{
if (transaction)
{
transaction->copyFile(source, destination);
}
else
{
disk->copyFile(source, *disk, destination);
}
}
else
disk->createHardLink(source, destination);
{
if (transaction)
transaction->createHardLink(source, destination);
else
disk->createHardLink(source, destination);
}
}
else
{
localBackupImpl(disk, source, destination, make_source_readonly, level + 1, max_level, files_to_copy_instead_of_hardlinks);
localBackupImpl(
disk, transaction, source, destination, make_source_readonly, level + 1, max_level,
copy_instead_of_hardlinks, files_to_copy_instead_of_hardlinks);
}
}
}
@ -89,7 +114,7 @@ private:
void localBackup(
const DiskPtr & disk, const String & source_path,
const String & destination_path, bool make_source_readonly,
std::optional<size_t> max_level, bool copy_instead_of_hardlinks, const NameSet & files_to_copy_intead_of_hardlinks)
std::optional<size_t> max_level, bool copy_instead_of_hardlinks, const NameSet & files_to_copy_intead_of_hardlinks, DiskTransactionPtr disk_transaction)
{
if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path))
{
@ -100,7 +125,8 @@ void localBackup(
size_t try_no = 0;
const size_t max_tries = 10;
CleanupOnFail cleanup([disk, destination_path]() { disk->removeRecursive(destination_path); });
CleanupOnFail cleanup(disk_transaction ? std::function<void()>([]{}) :
[disk, destination_path]() { disk->removeRecursive(destination_path); });
/** Files in the directory can be permanently added and deleted.
* If some file is deleted during an attempt to make a backup, then try again,
@ -110,10 +136,10 @@ void localBackup(
{
try
{
if (copy_instead_of_hardlinks)
if (copy_instead_of_hardlinks && !disk_transaction)
disk->copyDirectoryContent(source_path, disk, destination_path);
else
localBackupImpl(disk, source_path, destination_path, make_source_readonly, 0, max_level, files_to_copy_intead_of_hardlinks);
localBackupImpl(disk, disk_transaction.get(), source_path, destination_path, make_source_readonly, 0, max_level, copy_instead_of_hardlinks, files_to_copy_intead_of_hardlinks);
}
catch (const DB::ErrnoException & e)
{

View File

@ -9,7 +9,7 @@ namespace DB
/** Creates a local (at the same mount point) backup (snapshot) directory.
*
* In the specified destination directory, it creates a hard links on all source-directory files
* In the specified destination directory, it creates hard links on all source-directory files
* and in all nested directories, with saving (creating) all relative paths;
* and also `chown`, removing the write permission.
*
@ -17,9 +17,11 @@ namespace DB
* and is intended to be used as a simple means of protection against a human or program error,
* but not from a hardware failure.
*
* If max_level is specified, than only files which depth relative source_path less or equal max_level will be copied.
* If max_level is specified, than only files with depth relative source_path less or equal max_level will be copied.
* So, if max_level=0 than only direct file child are copied.
*
* If `transaction` is provided, the changes will be added to it instead of performend on disk.
*/
void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly = true, std::optional<size_t> max_level = {}, bool copy_instead_of_hardlinks = false, const NameSet & files_to_copy_intead_of_hardlinks = {});
void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly = true, std::optional<size_t> max_level = {}, bool copy_instead_of_hardlinks = false, const NameSet & files_to_copy_intead_of_hardlinks = {}, DiskTransactionPtr disk_transaction = nullptr);
}

View File

@ -42,7 +42,7 @@ node = cluster.add_instance(
main_configs=["configs/grpc_config.xml"],
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS")
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
},
)
main_channel = None

View File

@ -44,7 +44,7 @@ node = cluster.add_instance(
],
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS")
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
},
)

View File

@ -36,7 +36,7 @@ instance = cluster.add_instance(
with_zookeeper=True,
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS")
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
},
)