From 9c847ceec9efd890f594b5b85f5bb29128e7c265 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Wed, 7 Sep 2022 01:58:03 +0200 Subject: [PATCH 1/5] No hardlinks while making backup of MergeTree in atomic database. --- src/Backups/BackupEntryWrappedWith.h | 28 ++++++++++++++ .../MergeTree/DataPartStorageOnDisk.cpp | 37 +++++++++++++------ .../MergeTree/DataPartStorageOnDisk.h | 5 ++- src/Storages/MergeTree/IDataPartStorage.h | 5 ++- src/Storages/MergeTree/MergeTreeData.cpp | 26 +++++++++++-- src/Storages/MergeTree/MergeTreeData.h | 2 +- .../test_concurrency.py | 6 ++- 7 files changed, 87 insertions(+), 22 deletions(-) create mode 100644 src/Backups/BackupEntryWrappedWith.h diff --git a/src/Backups/BackupEntryWrappedWith.h b/src/Backups/BackupEntryWrappedWith.h new file mode 100644 index 00000000000..893a88db9fd --- /dev/null +++ b/src/Backups/BackupEntryWrappedWith.h @@ -0,0 +1,28 @@ +#pragma once + + +namespace DB +{ + +/// Wraps another backup entry and a value of any type. +template +class BackupEntryWrappedWith : public IBackupEntry +{ +public: + BackupEntryWrappedWith(BackupEntryPtr entry_, const T & custom_value_) : entry(entry_), custom_value(custom_value_) { } + BackupEntryWrappedWith(BackupEntryPtr entry_, T && custom_value_) : entry(entry_), custom_value(std::move(custom_value_)) { } + ~BackupEntryWrappedWith() override = default; + + UInt64 getSize() const override { return entry->getSize(); } + std::optional getChecksum() const override { return entry->getChecksum(); } + std::unique_ptr getReadBuffer() const override { return entry->getReadBuffer(); } + String getFilePath() const override { return entry->getFilePath(); } + DiskPtr tryGetDiskIfExists() const override { return entry->tryGetDiskIfExists(); } + DataSourceDescription getDataSourceDescription() const override { return entry->getDataSourceDescription(); } + +private: + BackupEntryPtr entry; + T custom_value; +}; + +} diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp index 0154fd6e281..894eec12f0c 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp @@ -650,23 +650,31 @@ bool DataPartStorageOnDisk::shallParticipateInMerges(const IStoragePolicy & stor } void DataPartStorageOnDisk::backup( - TemporaryFilesOnDisks & temp_dirs, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, - BackupEntries & backup_entries) const + BackupEntries & backup_entries, + bool make_temporary_hard_links, + TemporaryFilesOnDisks * temp_dirs) const { fs::path part_path_on_disk = fs::path{root_path} / part_dir; fs::path part_path_in_backup = fs::path{path_in_backup} / part_dir; auto disk = volume->getDisk(); - auto temp_dir_it = temp_dirs.find(disk); - if (temp_dir_it == temp_dirs.end()) - temp_dir_it = temp_dirs.emplace(disk, std::make_shared(disk, "tmp/")).first; - auto temp_dir_owner = temp_dir_it->second; - fs::path temp_dir = temp_dir_owner->getPath(); - fs::path temp_part_dir = temp_dir / part_path_in_backup.relative_path(); - disk->createDirectories(temp_part_dir); + + fs::path temp_part_dir; + std::shared_ptr temp_dir_owner; + if (make_temporary_hard_links) + { + assert(temp_dirs); + auto temp_dir_it = temp_dirs->find(disk); + if (temp_dir_it == temp_dirs->end()) + temp_dir_it = temp_dirs->emplace(disk, std::make_shared(disk, "tmp/")).first; + temp_dir_owner = temp_dir_it->second; + fs::path temp_dir = temp_dir_owner->getPath(); + temp_part_dir = temp_dir / part_path_in_backup.relative_path(); + disk->createDirectories(temp_part_dir); + } /// For example, /// part_path_in_backup = /data/test/table/0_1_1_0 @@ -683,13 +691,18 @@ void DataPartStorageOnDisk::backup( continue; /// Skip *.proj files - they're actually directories and will be handled. String filepath_on_disk = part_path_on_disk / filepath; String filepath_in_backup = part_path_in_backup / filepath; - String hardlink_filepath = temp_part_dir / filepath; - disk->createHardLink(filepath_on_disk, hardlink_filepath); + if (make_temporary_hard_links) + { + String hardlink_filepath = temp_part_dir / filepath; + disk->createHardLink(filepath_on_disk, hardlink_filepath); + filepath_on_disk = hardlink_filepath; + } + UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second}; backup_entries.emplace_back( filepath_in_backup, - std::make_unique(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner)); + std::make_unique(disk, filepath_on_disk, checksum.file_size, file_hash, temp_dir_owner)); } for (const auto & filepath : files_without_checksums) diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.h b/src/Storages/MergeTree/DataPartStorageOnDisk.h index 5b5cff8e636..f02ef26f811 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.h +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.h @@ -89,11 +89,12 @@ public: bool shallParticipateInMerges(const IStoragePolicy &) const override; void backup( - TemporaryFilesOnDisks & temp_dirs, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, - BackupEntries & backup_entries) const override; + BackupEntries & backup_entries, + bool make_temporary_hard_links, + TemporaryFilesOnDisks * temp_dirs) const override; DataPartStoragePtr freeze( const std::string & to, diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index 946c1c5fd47..9da8a5eae03 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -177,11 +177,12 @@ public: /// Also creates a new tmp_dir for internal disk (if disk is mentioned the first time). using TemporaryFilesOnDisks = std::map>; virtual void backup( - TemporaryFilesOnDisks & temp_dirs, const MergeTreeDataPartChecksums & checksums, const NameSet & files_without_checksums, const String & path_in_backup, - BackupEntries & backup_entries) const = 0; + BackupEntries & backup_entries, + bool make_temporary_hard_links, + TemporaryFilesOnDisks * temp_dirs) const = 0; /// Creates hardlinks into 'to/dir_path' for every file in data part. /// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b7b68367e98..8b077e6fb13 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -4110,26 +4111,43 @@ void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup)); } -BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) +BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) const { BackupEntries backup_entries; std::map> temp_dirs; + /// Tables in atomic databases have UUID. When using atomic database we don't have to create hard links to make a backup, we can just + /// keep smart pointers to data parts instead. That's because the files of a data part are removed only by the destructor of the data part + /// and so keeping a smart pointer to a data part is enough to protect those files from deleting. + bool use_hard_links = !getStorageID().hasUUID(); + for (const auto & part : data_parts) { + BackupEntries new_backup_entries; + part->data_part_storage->backup( - temp_dirs, part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, backup_entries); + part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, new_backup_entries, use_hard_links, &temp_dirs); auto projection_parts = part->getProjectionParts(); for (const auto & [projection_name, projection_part] : projection_parts) { projection_part->data_part_storage->backup( - temp_dirs, projection_part->checksums, projection_part->getFileNamesWithoutChecksums(), fs::path{data_path_in_backup} / part->name, - backup_entries); + new_backup_entries, + use_hard_links, + &temp_dirs); } + + if (!use_hard_links) + { + /// Wrap backup entries with data parts in order to keep the data parts alive while the backup entries in use. + for (auto & [_, backup_entry] : new_backup_entries) + backup_entry = std::make_shared>(std::move(backup_entry), part); + } + + insertAtEnd(backup_entries, std::move(new_backup_entries)); } return backup_entries; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index c91c7ba02a8..93f9e6157d8 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1243,7 +1243,7 @@ protected: bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space); /// Makes backup entries to backup the parts of this table. - static BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup); + BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) const; class RestoredPartsHolder; diff --git a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py index 2269ccda828..315c6b94507 100644 --- a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py +++ b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py @@ -175,7 +175,11 @@ def test_concurrent_backups_on_different_nodes(): @pytest.mark.parametrize( "db_engine, table_engine", - [("Replicated", "ReplicatedMergeTree"), ("Ordinary", "MergeTree")], + [ + ("Ordinary", "MergeTree"), + ("Atomic", "MergeTree"), + ("Replicated", "ReplicatedMergeTree"), + ], ) def test_create_or_drop_tables_during_backup(db_engine, table_engine): if db_engine == "Replicated": From 122009a2bde416447ef6cbb98ea36c95b47f2b31 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Thu, 8 Sep 2022 08:29:31 +0200 Subject: [PATCH 2/5] Use table lock if database is ordinary and zero-copy-replication is enabled. --- src/Backups/BackupEntryWrappedWith.h | 9 +++ src/Storages/MergeTree/MergeTreeData.cpp | 60 ++++++++++++++----- src/Storages/MergeTree/MergeTreeData.h | 2 +- src/Storages/StorageMergeTree.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- .../test_concurrency.py | 9 ++- 6 files changed, 64 insertions(+), 20 deletions(-) diff --git a/src/Backups/BackupEntryWrappedWith.h b/src/Backups/BackupEntryWrappedWith.h index 893a88db9fd..97244650b6b 100644 --- a/src/Backups/BackupEntryWrappedWith.h +++ b/src/Backups/BackupEntryWrappedWith.h @@ -1,5 +1,7 @@ #pragma once +#include + namespace DB { @@ -25,4 +27,11 @@ private: T custom_value; }; +template +void wrapBackupEntriesWith(std::vector> & backup_entries, const T & custom_value) +{ + for (auto & [_, backup_entry] : backup_entries) + backup_entry = std::make_shared>(std::move(backup_entry), custom_value); +} + } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 8b077e6fb13..b9deeeaffb7 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4108,25 +4108,49 @@ void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector else data_parts = getVisibleDataPartsVector(local_context); - backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup)); + backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context)); } -BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) const +BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context) { BackupEntries backup_entries; std::map> temp_dirs; - - /// Tables in atomic databases have UUID. When using atomic database we don't have to create hard links to make a backup, we can just - /// keep smart pointers to data parts instead. That's because the files of a data part are removed only by the destructor of the data part - /// and so keeping a smart pointer to a data part is enough to protect those files from deleting. - bool use_hard_links = !getStorageID().hasUUID(); + TableLockHolder table_lock; for (const auto & part : data_parts) { - BackupEntries new_backup_entries; + /// Hard links is the default way to ensure that we'll be keeping access to the files of parts. + bool make_temporary_hard_links = true; + bool hold_storage_and_part_ptrs = false; + bool hold_table_lock = false; + if (getStorageID().hasUUID()) + { + /// Tables in atomic databases have UUIDs. When using atomic database we don't have to create hard links to make a backup, + /// we can just hold smart pointers to a storage and to data parts instead. That's enough to protect those files from deleting + /// until the backup is done (see the calls `part.unique()` in grabOldParts() and table.unique() in DatabaseCatalog). + make_temporary_hard_links = false; + hold_storage_and_part_ptrs = true; + } + else if (supportsReplication() && part->data_part_storage->supportZeroCopyReplication() && getSettings()->allow_remote_fs_zero_copy_replication) + { + /// Hard links don't work correctly with zero copy replication. + make_temporary_hard_links = false; + hold_storage_and_part_ptrs = true; + hold_table_lock = true; + } + + if (hold_table_lock && !table_lock) + table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); + + BackupEntries backup_entries_from_part; part->data_part_storage->backup( - part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, new_backup_entries, use_hard_links, &temp_dirs); + part->checksums, + part->getFileNamesWithoutChecksums(), + data_path_in_backup, + backup_entries_from_part, + make_temporary_hard_links, + &temp_dirs); auto projection_parts = part->getProjectionParts(); for (const auto & [projection_name, projection_part] : projection_parts) @@ -4135,19 +4159,23 @@ BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, con projection_part->checksums, projection_part->getFileNamesWithoutChecksums(), fs::path{data_path_in_backup} / part->name, - new_backup_entries, - use_hard_links, + backup_entries_from_part, + make_temporary_hard_links, &temp_dirs); } - if (!use_hard_links) + if (hold_storage_and_part_ptrs) { - /// Wrap backup entries with data parts in order to keep the data parts alive while the backup entries in use. - for (auto & [_, backup_entry] : new_backup_entries) - backup_entry = std::make_shared>(std::move(backup_entry), part); + /// Wrap backup entries with smart pointers to data parts and to the storage itself + /// (we'll be holding those smart pointers for as long as we'll be using the backup entries). + auto storage_and_part = std::make_pair(shared_from_this(), part); + if (hold_table_lock) + wrapBackupEntriesWith(backup_entries_from_part, std::make_pair(storage_and_part, table_lock)); + else + wrapBackupEntriesWith(backup_entries_from_part, storage_and_part); } - insertAtEnd(backup_entries, std::move(new_backup_entries)); + insertAtEnd(backup_entries, std::move(backup_entries_from_part)); } return backup_entries; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 93f9e6157d8..824c2b41f00 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1243,7 +1243,7 @@ protected: bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space); /// Makes backup entries to backup the parts of this table. - BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) const; + BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context); class RestoredPartsHolder; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 66e570fdc3b..5adc1974257 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1785,7 +1785,7 @@ void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collec for (const auto & data_part : data_parts) min_data_version = std::min(min_data_version, data_part->info.getDataVersion()); - backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup)); + backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context)); backup_entries_collector.addBackupEntries(backupMutations(min_data_version + 1, data_path_in_backup)); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 4be97e01293..d662682d228 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8264,7 +8264,7 @@ void StorageReplicatedMergeTree::backupData( else data_parts = getVisibleDataPartsVector(local_context); - auto backup_entries = backupParts(data_parts, ""); + auto backup_entries = backupParts(data_parts, "", local_context); auto coordination = backup_entries_collector.getBackupCoordination(); String shared_id = getTableSharedID(); diff --git a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py index 315c6b94507..fa6b09a89cf 100644 --- a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py +++ b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py @@ -29,7 +29,6 @@ def generate_cluster_def(): main_configs = ["configs/backups_disk.xml", generate_cluster_def()] - user_configs = ["configs/allow_database_types.xml"] nodes = [] @@ -184,6 +183,7 @@ def test_concurrent_backups_on_different_nodes(): def test_create_or_drop_tables_during_backup(db_engine, table_engine): if db_engine == "Replicated": db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')" + if table_engine.endswith("MergeTree"): table_engine += " ORDER BY tuple()" @@ -219,6 +219,12 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): f"RENAME TABLE {table_name1} TO {table_name2}" ) + def truncate_table(): + while time.time() < end_time: + table_name = f"mydb.tbl{randint(1, num_nodes)}" + node = nodes[randint(0, num_nodes - 1)] + node.query(f"TRUNCATE TABLE IF EXISTS {table_name} NO DELAY") + def make_backup(): ids = [] while time.time() < end_time: @@ -240,6 +246,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): futures.append(executor.submit(create_table)) futures.append(executor.submit(drop_table)) futures.append(executor.submit(rename_table)) + futures.append(executor.submit(truncate_table)) for future in futures: future.result() ids = ids_future.result() From 9acc73d811474b61a164debc02a9b730ffab1eb9 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Thu, 8 Sep 2022 17:51:29 +0200 Subject: [PATCH 3/5] Update src/Storages/StorageReplicatedMergeTree.cpp Co-authored-by: Alexander Tokmakov --- src/Storages/StorageReplicatedMergeTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d662682d228..93626f768be 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8264,7 +8264,7 @@ void StorageReplicatedMergeTree::backupData( else data_parts = getVisibleDataPartsVector(local_context); - auto backup_entries = backupParts(data_parts, "", local_context); + auto backup_entries = backupParts(data_parts, /* data_path_in_backup */ "", local_context); auto coordination = backup_entries_collector.getBackupCoordination(); String shared_id = getTableSharedID(); From e48cd2b6f6d70236d6b7b66dc47c1cb03d5d8488 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Thu, 8 Sep 2022 18:13:28 +0200 Subject: [PATCH 4/5] Add more test cases. --- .../test_concurrency.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py index fa6b09a89cf..6882cbff5e2 100644 --- a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py +++ b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py @@ -178,12 +178,17 @@ def test_concurrent_backups_on_different_nodes(): ("Ordinary", "MergeTree"), ("Atomic", "MergeTree"), ("Replicated", "ReplicatedMergeTree"), + ("Memory", "MergeTree"), + ("Lazy", "Log") ], ) def test_create_or_drop_tables_during_backup(db_engine, table_engine): if db_engine == "Replicated": db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')" + if db_engine == "Lazy": + db_engine = "Lazy(20)" + if table_engine.endswith("MergeTree"): table_engine += " ORDER BY tuple()" @@ -193,7 +198,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): start_time = time.time() end_time = start_time + 60 - def create_table(): + def create_tables(): while time.time() < end_time: node = nodes[randint(0, num_nodes - 1)] table_name = f"mydb.tbl{randint(1, num_nodes)}" @@ -204,13 +209,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): f"INSERT INTO {table_name} SELECT rand32() FROM numbers(10)" ) - def drop_table(): + def drop_tables(): while time.time() < end_time: table_name = f"mydb.tbl{randint(1, num_nodes)}" node = nodes[randint(0, num_nodes - 1)] node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY") - def rename_table(): + def rename_tables(): while time.time() < end_time: table_name1 = f"mydb.tbl{randint(1, num_nodes)}" table_name2 = f"mydb.tbl{randint(1, num_nodes)}" @@ -219,13 +224,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): f"RENAME TABLE {table_name1} TO {table_name2}" ) - def truncate_table(): + def truncate_tables(): while time.time() < end_time: table_name = f"mydb.tbl{randint(1, num_nodes)}" node = nodes[randint(0, num_nodes - 1)] node.query(f"TRUNCATE TABLE IF EXISTS {table_name} NO DELAY") - def make_backup(): + def make_backups(): ids = [] while time.time() < end_time: time.sleep( @@ -241,12 +246,12 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine): ids = [] with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [] - ids_future = executor.submit(make_backup) + ids_future = executor.submit(make_backups) futures.append(ids_future) - futures.append(executor.submit(create_table)) - futures.append(executor.submit(drop_table)) - futures.append(executor.submit(rename_table)) - futures.append(executor.submit(truncate_table)) + futures.append(executor.submit(create_tables)) + futures.append(executor.submit(drop_tables)) + futures.append(executor.submit(rename_tables)) + futures.append(executor.submit(truncate_tables)) for future in futures: future.result() ids = ids_future.result() From 10629a66e582c7a2250b01d7a1e4853223dcd631 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Thu, 8 Sep 2022 20:58:51 +0200 Subject: [PATCH 5/5] Fix black. --- .../test_backup_restore_on_cluster/test_concurrency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py index 6882cbff5e2..dd8b6aa50da 100644 --- a/tests/integration/test_backup_restore_on_cluster/test_concurrency.py +++ b/tests/integration/test_backup_restore_on_cluster/test_concurrency.py @@ -179,7 +179,7 @@ def test_concurrent_backups_on_different_nodes(): ("Atomic", "MergeTree"), ("Replicated", "ReplicatedMergeTree"), ("Memory", "MergeTree"), - ("Lazy", "Log") + ("Lazy", "Log"), ], ) def test_create_or_drop_tables_during_backup(db_engine, table_engine):