Merge pull request #41066 from vitlibar/no-hardlinks-while-making-backup-of-mergetree-in-atomic-db

No hardlinks while making backup of MergeTree in atomic database.
This commit is contained in:
Alexander Tokmakov 2022-09-09 14:25:11 +03:00 committed by GitHub
commit d6dbde4289
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 148 additions and 34 deletions

View File

@ -0,0 +1,37 @@
#pragma once
#include <Backups/IBackupEntry.h>
namespace DB
{
/// Wraps another backup entry and a value of any type.
template <typename T>
class BackupEntryWrappedWith : public IBackupEntry
{
public:
BackupEntryWrappedWith(BackupEntryPtr entry_, const T & custom_value_) : entry(entry_), custom_value(custom_value_) { }
BackupEntryWrappedWith(BackupEntryPtr entry_, T && custom_value_) : entry(entry_), custom_value(std::move(custom_value_)) { }
~BackupEntryWrappedWith() override = default;
UInt64 getSize() const override { return entry->getSize(); }
std::optional<UInt128> getChecksum() const override { return entry->getChecksum(); }
std::unique_ptr<SeekableReadBuffer> getReadBuffer() const override { return entry->getReadBuffer(); }
String getFilePath() const override { return entry->getFilePath(); }
DiskPtr tryGetDiskIfExists() const override { return entry->tryGetDiskIfExists(); }
DataSourceDescription getDataSourceDescription() const override { return entry->getDataSourceDescription(); }
private:
BackupEntryPtr entry;
T custom_value;
};
template <typename T>
void wrapBackupEntriesWith(std::vector<std::pair<String, BackupEntryPtr>> & backup_entries, const T & custom_value)
{
for (auto & [_, backup_entry] : backup_entries)
backup_entry = std::make_shared<BackupEntryWrappedWith<T>>(std::move(backup_entry), custom_value);
}
}

View File

@ -650,23 +650,31 @@ bool DataPartStorageOnDisk::shallParticipateInMerges(const IStoragePolicy & stor
}
void DataPartStorageOnDisk::backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums,
const String & path_in_backup,
BackupEntries & backup_entries) const
BackupEntries & backup_entries,
bool make_temporary_hard_links,
TemporaryFilesOnDisks * temp_dirs) const
{
fs::path part_path_on_disk = fs::path{root_path} / part_dir;
fs::path part_path_in_backup = fs::path{path_in_backup} / part_dir;
auto disk = volume->getDisk();
auto temp_dir_it = temp_dirs.find(disk);
if (temp_dir_it == temp_dirs.end())
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
auto temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();
fs::path temp_part_dir = temp_dir / part_path_in_backup.relative_path();
disk->createDirectories(temp_part_dir);
fs::path temp_part_dir;
std::shared_ptr<TemporaryFileOnDisk> temp_dir_owner;
if (make_temporary_hard_links)
{
assert(temp_dirs);
auto temp_dir_it = temp_dirs->find(disk);
if (temp_dir_it == temp_dirs->end())
temp_dir_it = temp_dirs->emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/")).first;
temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();
temp_part_dir = temp_dir / part_path_in_backup.relative_path();
disk->createDirectories(temp_part_dir);
}
/// For example,
/// part_path_in_backup = /data/test/table/0_1_1_0
@ -683,13 +691,18 @@ void DataPartStorageOnDisk::backup(
continue; /// Skip *.proj files - they're actually directories and will be handled.
String filepath_on_disk = part_path_on_disk / filepath;
String filepath_in_backup = part_path_in_backup / filepath;
String hardlink_filepath = temp_part_dir / filepath;
disk->createHardLink(filepath_on_disk, hardlink_filepath);
if (make_temporary_hard_links)
{
String hardlink_filepath = temp_part_dir / filepath;
disk->createHardLink(filepath_on_disk, hardlink_filepath);
filepath_on_disk = hardlink_filepath;
}
UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second};
backup_entries.emplace_back(
filepath_in_backup,
std::make_unique<BackupEntryFromImmutableFile>(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner));
std::make_unique<BackupEntryFromImmutableFile>(disk, filepath_on_disk, checksum.file_size, file_hash, temp_dir_owner));
}
for (const auto & filepath : files_without_checksums)

View File

@ -89,11 +89,12 @@ public:
bool shallParticipateInMerges(const IStoragePolicy &) const override;
void backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums,
const String & path_in_backup,
BackupEntries & backup_entries) const override;
BackupEntries & backup_entries,
bool make_temporary_hard_links,
TemporaryFilesOnDisks * temp_dirs) const override;
DataPartStoragePtr freeze(
const std::string & to,

View File

@ -177,11 +177,12 @@ public:
/// Also creates a new tmp_dir for internal disk (if disk is mentioned the first time).
using TemporaryFilesOnDisks = std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>>;
virtual void backup(
TemporaryFilesOnDisks & temp_dirs,
const MergeTreeDataPartChecksums & checksums,
const NameSet & files_without_checksums,
const String & path_in_backup,
BackupEntries & backup_entries) const = 0;
BackupEntries & backup_entries,
bool make_temporary_hard_links,
TemporaryFilesOnDisks * temp_dirs) const = 0;
/// Creates hardlinks into 'to/dir_path' for every file in data part.
/// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed.

View File

@ -3,6 +3,7 @@
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/BackupEntryWrappedWith.h>
#include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h>
#include <Compression/CompressedReadBuffer.h>
@ -4109,29 +4110,74 @@ void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector
else
data_parts = getVisibleDataPartsVector(local_context);
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup));
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context));
}
BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup)
BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context)
{
BackupEntries backup_entries;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
TableLockHolder table_lock;
for (const auto & part : data_parts)
{
/// Hard links is the default way to ensure that we'll be keeping access to the files of parts.
bool make_temporary_hard_links = true;
bool hold_storage_and_part_ptrs = false;
bool hold_table_lock = false;
if (getStorageID().hasUUID())
{
/// Tables in atomic databases have UUIDs. When using atomic database we don't have to create hard links to make a backup,
/// we can just hold smart pointers to a storage and to data parts instead. That's enough to protect those files from deleting
/// until the backup is done (see the calls `part.unique()` in grabOldParts() and table.unique() in DatabaseCatalog).
make_temporary_hard_links = false;
hold_storage_and_part_ptrs = true;
}
else if (supportsReplication() && part->data_part_storage->supportZeroCopyReplication() && getSettings()->allow_remote_fs_zero_copy_replication)
{
/// Hard links don't work correctly with zero copy replication.
make_temporary_hard_links = false;
hold_storage_and_part_ptrs = true;
hold_table_lock = true;
}
if (hold_table_lock && !table_lock)
table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
BackupEntries backup_entries_from_part;
part->data_part_storage->backup(
temp_dirs, part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, backup_entries);
part->checksums,
part->getFileNamesWithoutChecksums(),
data_path_in_backup,
backup_entries_from_part,
make_temporary_hard_links,
&temp_dirs);
auto projection_parts = part->getProjectionParts();
for (const auto & [projection_name, projection_part] : projection_parts)
{
projection_part->data_part_storage->backup(
temp_dirs,
projection_part->checksums,
projection_part->getFileNamesWithoutChecksums(),
fs::path{data_path_in_backup} / part->name,
backup_entries);
backup_entries_from_part,
make_temporary_hard_links,
&temp_dirs);
}
if (hold_storage_and_part_ptrs)
{
/// Wrap backup entries with smart pointers to data parts and to the storage itself
/// (we'll be holding those smart pointers for as long as we'll be using the backup entries).
auto storage_and_part = std::make_pair(shared_from_this(), part);
if (hold_table_lock)
wrapBackupEntriesWith(backup_entries_from_part, std::make_pair(storage_and_part, table_lock));
else
wrapBackupEntriesWith(backup_entries_from_part, storage_and_part);
}
insertAtEnd(backup_entries, std::move(backup_entries_from_part));
}
return backup_entries;

View File

@ -1231,7 +1231,7 @@ protected:
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
/// Makes backup entries to backup the parts of this table.
static BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup);
BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context);
class RestoredPartsHolder;

View File

@ -1785,7 +1785,7 @@ void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collec
for (const auto & data_part : data_parts)
min_data_version = std::min(min_data_version, data_part->info.getDataVersion());
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup));
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context));
backup_entries_collector.addBackupEntries(backupMutations(min_data_version + 1, data_path_in_backup));
}

View File

@ -8290,7 +8290,7 @@ void StorageReplicatedMergeTree::backupData(
else
data_parts = getVisibleDataPartsVector(local_context);
auto backup_entries = backupParts(data_parts, "");
auto backup_entries = backupParts(data_parts, /* data_path_in_backup */ "", local_context);
auto coordination = backup_entries_collector.getBackupCoordination();
String shared_id = getTableSharedID();

View File

@ -29,7 +29,6 @@ def generate_cluster_def():
main_configs = ["configs/backups_disk.xml", generate_cluster_def()]
user_configs = ["configs/allow_database_types.xml"]
nodes = []
@ -175,11 +174,21 @@ def test_concurrent_backups_on_different_nodes():
@pytest.mark.parametrize(
"db_engine, table_engine",
[("Replicated", "ReplicatedMergeTree"), ("Ordinary", "MergeTree")],
[
("Ordinary", "MergeTree"),
("Atomic", "MergeTree"),
("Replicated", "ReplicatedMergeTree"),
("Memory", "MergeTree"),
("Lazy", "Log"),
],
)
def test_create_or_drop_tables_during_backup(db_engine, table_engine):
if db_engine == "Replicated":
db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')"
if db_engine == "Lazy":
db_engine = "Lazy(20)"
if table_engine.endswith("MergeTree"):
table_engine += " ORDER BY tuple()"
@ -189,7 +198,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
start_time = time.time()
end_time = start_time + 60
def create_table():
def create_tables():
while time.time() < end_time:
node = nodes[randint(0, num_nodes - 1)]
table_name = f"mydb.tbl{randint(1, num_nodes)}"
@ -200,13 +209,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
f"INSERT INTO {table_name} SELECT rand32() FROM numbers(10)"
)
def drop_table():
def drop_tables():
while time.time() < end_time:
table_name = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY")
def rename_table():
def rename_tables():
while time.time() < end_time:
table_name1 = f"mydb.tbl{randint(1, num_nodes)}"
table_name2 = f"mydb.tbl{randint(1, num_nodes)}"
@ -215,7 +224,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
f"RENAME TABLE {table_name1} TO {table_name2}"
)
def make_backup():
def truncate_tables():
while time.time() < end_time:
table_name = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
node.query(f"TRUNCATE TABLE IF EXISTS {table_name} NO DELAY")
def make_backups():
ids = []
while time.time() < end_time:
time.sleep(
@ -231,11 +246,12 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
ids = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
ids_future = executor.submit(make_backup)
ids_future = executor.submit(make_backups)
futures.append(ids_future)
futures.append(executor.submit(create_table))
futures.append(executor.submit(drop_table))
futures.append(executor.submit(rename_table))
futures.append(executor.submit(create_tables))
futures.append(executor.submit(drop_tables))
futures.append(executor.submit(rename_tables))
futures.append(executor.submit(truncate_tables))
for future in futures:
future.result()
ids = ids_future.result()