diff --git a/src/Backups/BackupEntryWrappedWith.h b/src/Backups/BackupEntryWrappedWith.h new file mode 100644 index 00000000000..97244650b6b --- /dev/null +++ b/src/Backups/BackupEntryWrappedWith.h @@ -0,0 +1,37 @@ +#pragma once + +#include + + +namespace DB +{ + +/// Wraps another backup entry and a value of any type. +template +class BackupEntryWrappedWith : public IBackupEntry +{ +public: + BackupEntryWrappedWith(BackupEntryPtr entry_, const T & custom_value_) : entry(entry_), custom_value(custom_value_) { } + BackupEntryWrappedWith(BackupEntryPtr entry_, T && custom_value_) : entry(entry_), custom_value(std::move(custom_value_)) { } + ~BackupEntryWrappedWith() override = default; + + UInt64 getSize() const override { return entry->getSize(); } + std::optional getChecksum() const override { return entry->getChecksum(); } + std::unique_ptr getReadBuffer() const override { return entry->getReadBuffer(); } + String getFilePath() const override { return entry->getFilePath(); } + DiskPtr tryGetDiskIfExists() const override { return entry->tryGetDiskIfExists(); } + DataSourceDescription getDataSourceDescription() const override { return entry->getDataSourceDescription(); } + +private: + BackupEntryPtr entry; + T custom_value; +}; + +template +void wrapBackupEntriesWith(std::vector> & backup_entries, const T & custom_value) +{ + for (auto & [_, backup_entry] : backup_entries) + backup_entry = std::make_shared>(std::move(backup_entry), custom_value); +} + +} diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp index 0154fd6e281..894eec12f0c 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp @@ -650,23 +650,31 @@ bool DataPartStorageOnDisk::shallParticipateInMerges(const IStoragePolicy & stor } void DataPartStorageOnDisk::backup( - TemporaryFilesOnDisks & temp_dirs, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, - BackupEntries & backup_entries) const + BackupEntries & backup_entries, + bool make_temporary_hard_links, + TemporaryFilesOnDisks * temp_dirs) const { fs::path part_path_on_disk = fs::path{root_path} / part_dir; fs::path part_path_in_backup = fs::path{path_in_backup} / part_dir; auto disk = volume->getDisk(); - auto temp_dir_it = temp_dirs.find(disk); - if (temp_dir_it == temp_dirs.end()) - temp_dir_it = temp_dirs.emplace(disk, std::make_shared(disk, "tmp/")).first; - auto temp_dir_owner = temp_dir_it->second; - fs::path temp_dir = temp_dir_owner->getPath(); - fs::path temp_part_dir = temp_dir / part_path_in_backup.relative_path(); - disk->createDirectories(temp_part_dir); + + fs::path temp_part_dir; + std::shared_ptr temp_dir_owner; + if (make_temporary_hard_links) + { + assert(temp_dirs); + auto temp_dir_it = temp_dirs->find(disk); + if (temp_dir_it == temp_dirs->end()) + temp_dir_it = temp_dirs->emplace(disk, std::make_shared(disk, "tmp/")).first; + temp_dir_owner = temp_dir_it->second; + fs::path temp_dir = temp_dir_owner->getPath(); + temp_part_dir = temp_dir / part_path_in_backup.relative_path(); + disk->createDirectories(temp_part_dir); + } /// For example, /// part_path_in_backup = /data/test/table/0_1_1_0 @@ -683,13 +691,18 @@ void DataPartStorageOnDisk::backup( continue; /// Skip *.proj files - they're actually directories and will be handled. String filepath_on_disk = part_path_on_disk / filepath; String filepath_in_backup = part_path_in_backup / filepath; - String hardlink_filepath = temp_part_dir / filepath; - disk->createHardLink(filepath_on_disk, hardlink_filepath); + if (make_temporary_hard_links) + { + String hardlink_filepath = temp_part_dir / filepath; + disk->createHardLink(filepath_on_disk, hardlink_filepath); + filepath_on_disk = hardlink_filepath; + } + UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second}; backup_entries.emplace_back( filepath_in_backup, - std::make_unique(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner)); + std::make_unique(disk, filepath_on_disk, checksum.file_size, file_hash, temp_dir_owner)); } for (const auto & filepath : files_without_checksums) diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.h b/src/Storages/MergeTree/DataPartStorageOnDisk.h index 5b5cff8e636..f02ef26f811 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.h +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.h @@ -89,11 +89,12 @@ public: bool shallParticipateInMerges(const IStoragePolicy &) const override; void backup( - TemporaryFilesOnDisks & temp_dirs, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, - BackupEntries & backup_entries) const override; + BackupEntries & backup_entries, + bool make_temporary_hard_links, + TemporaryFilesOnDisks * temp_dirs) const override; DataPartStoragePtr freeze( const std::string & to, diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index 946c1c5fd47..9da8a5eae03 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -177,11 +177,12 @@ public: /// Also creates a new tmp_dir for internal disk (if disk is mentioned the first time). using TemporaryFilesOnDisks = std::map>; virtual void backup( - TemporaryFilesOnDisks & temp_dirs, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, - BackupEntries & backup_entries) const = 0; + BackupEntries & backup_entries, + bool make_temporary_hard_links, + TemporaryFilesOnDisks * temp_dirs) const = 0; /// Creates hardlinks into 'to/dir_path' for every file in data part. /// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index edc070a8acf..78821ffcb74 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -4109,29 +4110,74 @@ void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector else data_parts = getVisibleDataPartsVector(local_context); - backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup)); + backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context)); } -BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) +BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context) { BackupEntries backup_entries; std::map> temp_dirs; + TableLockHolder table_lock; for (const auto & part : data_parts) { + /// Hard links is the default way to ensure that we'll be keeping access to the files of parts. + bool make_temporary_hard_links = true; + bool hold_storage_and_part_ptrs = false; + bool hold_table_lock = false; + + if (getStorageID().hasUUID()) + { + /// Tables in atomic databases have UUIDs. When using atomic database we don't have to create hard links to make a backup, + /// we can just hold smart pointers to a storage and to data parts instead. That's enough to protect those files from deleting + /// until the backup is done (see the calls `part.unique()` in grabOldParts() and table.unique() in DatabaseCatalog). + make_temporary_hard_links = false; + hold_storage_and_part_ptrs = true; + } + else if (supportsReplication() && part->data_part_storage->supportZeroCopyReplication() && getSettings()->allow_remote_fs_zero_copy_replication) + { + /// Hard links don't work correctly with zero copy replication. + make_temporary_hard_links = false; + hold_storage_and_part_ptrs = true; + hold_table_lock = true; + } + + if (hold_table_lock && !table_lock) + table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); + + BackupEntries backup_entries_from_part; part->data_part_storage->backup( - temp_dirs, part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, backup_entries); + part->checksums, + part->getFileNamesWithoutChecksums(), + data_path_in_backup, + backup_entries_from_part, + make_temporary_hard_links, + &temp_dirs); auto projection_parts = part->getProjectionParts(); for (const auto & [projection_name, projection_part] : projection_parts) { projection_part->data_part_storage->backup( - temp_dirs, projection_part->checksums, projection_part->getFileNamesWithoutChecksums(), fs::path{data_path_in_backup} / part->name, - backup_entries); + backup_entries_from_part, + make_temporary_hard_links, + &temp_dirs); } + + if (hold_storage_and_part_ptrs) + { + /// Wrap backup entries with smart pointers to data parts and to the storage itself + /// (we'll be holding those smart pointers for as long as we'll be using the backup entries). + auto storage_and_part = std::make_pair(shared_from_this(), part); + if (hold_table_lock) + wrapBackupEntriesWith(backup_entries_from_part, std::make_pair(storage_and_part, table_lock)); + else + wrapBackupEntriesWith(backup_entries_from_part, storage_and_part); + } + + insertAtEnd(backup_entries, std::move(backup_entries_from_part)); } return backup_entries; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index aca62153782..3a35daf4c90 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1231,7 +1231,7 @@ protected: bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space); /// Makes backup entries to backup the parts of this table. - static BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup); + BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context); class RestoredPartsHolder; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 66e570fdc3b..5adc1974257 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1785,7 +1785,7 @@ void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collec for (const auto & data_part : data_parts) min_data_version = std::min(min_data_version, data_part->info.getDataVersion()); - backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup)); + backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context)); backup_entries_collector.addBackupEntries(backupMutations(min_data_version + 1, data_path_in_backup)); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3491f6c9d4a..740708c9320 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8290,7 +8290,7 @@ void StorageReplicatedMergeTree::backupData( else data_parts = getVisibleDataPartsVector(local_context); - auto backup_entries = backupParts(data_parts, ""); + auto backup_entries = backupParts(data_parts, /* data_path_in_backup */ "", local_context); auto coordination = backup_entries_collector.getBackupCoordination(); String shared_id = getTableSharedID(); diff --git a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py index 2269ccda828..dd8b6aa50da 100644 --- a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py +++ b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py @@ -29,7 +29,6 @@ def generate_cluster_def(): main_configs = ["configs/backups_disk.xml", generate_cluster_def()] - user_configs = ["configs/allow_database_types.xml"] nodes = [] @@ -175,11 +174,21 @@ def test_concurrent_backups_on_different_nodes(): @pytest.mark.parametrize( "db_engine, table_engine", - [("Replicated", "ReplicatedMergeTree"), ("Ordinary", "MergeTree")], + [ + ("Ordinary", "MergeTree"), + ("Atomic", "MergeTree"), + ("Replicated", "ReplicatedMergeTree"), + ("Memory", "MergeTree"), + ("Lazy", "Log"), + ], ) def test_create_or_drop_tables_during_backup(db_engine, table_engine): if db_engine == "Replicated": db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')" + + if db_engine == "Lazy": + db_engine = "Lazy(20)" + if table_engine.endswith("MergeTree"): table_engine += " ORDER BY tuple()" @@ -189,7 +198,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): start_time = time.time() end_time = start_time + 60 - def create_table(): + def create_tables(): while time.time() < end_time: node = nodes[randint(0, num_nodes - 1)] table_name = f"mydb.tbl{randint(1, num_nodes)}" @@ -200,13 +209,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): f"INSERT INTO {table_name} SELECT rand32() FROM numbers(10)" ) - def drop_table(): + def drop_tables(): while time.time() < end_time: table_name = f"mydb.tbl{randint(1, num_nodes)}" node = nodes[randint(0, num_nodes - 1)] node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY") - def rename_table(): + def rename_tables(): while time.time() < end_time: table_name1 = f"mydb.tbl{randint(1, num_nodes)}" table_name2 = f"mydb.tbl{randint(1, num_nodes)}" @@ -215,7 +224,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): f"RENAME TABLE {table_name1} TO {table_name2}" ) - def make_backup(): + def truncate_tables(): + while time.time() < end_time: + table_name = f"mydb.tbl{randint(1, num_nodes)}" + node = nodes[randint(0, num_nodes - 1)] + node.query(f"TRUNCATE TABLE IF EXISTS {table_name} NO DELAY") + + def make_backups(): ids = [] while time.time() < end_time: time.sleep( @@ -231,11 +246,12 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): ids = [] with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [] - ids_future = executor.submit(make_backup) + ids_future = executor.submit(make_backups) futures.append(ids_future) - futures.append(executor.submit(create_table)) - futures.append(executor.submit(drop_table)) - futures.append(executor.submit(rename_table)) + futures.append(executor.submit(create_tables)) + futures.append(executor.submit(drop_tables)) + futures.append(executor.submit(rename_tables)) + futures.append(executor.submit(truncate_tables)) for future in futures: future.result() ids = ids_future.result()