Use table lock if database is ordinary and zero-copy-replication is enabled.

This commit is contained in:
Vitaly Baranov 2022-09-08 08:29:31 +02:00
parent 9c847ceec9
commit 122009a2bd
6 changed files with 64 additions and 20 deletions

View File

@ -1,5 +1,7 @@
#pragma once
#include <Backups/IBackupEntry.h>
namespace DB
{
@ -25,4 +27,11 @@ private:
T custom_value;
};
template <typename T>
void wrapBackupEntriesWith(std::vector<std::pair<String, BackupEntryPtr>> & backup_entries, const T & custom_value)
{
for (auto & [_, backup_entry] : backup_entries)
backup_entry = std::make_shared<BackupEntryWrappedWith<T>>(std::move(backup_entry), custom_value);
}
}

View File

@ -4108,25 +4108,49 @@ void MergeTreeData::backupData(BackupEntriesCollector & backup_entries_collector
else
data_parts = getVisibleDataPartsVector(local_context);
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup));
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context));
}
BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) const
BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context)
{
BackupEntries backup_entries;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
/// Tables in atomic databases have UUID. When using atomic database we don't have to create hard links to make a backup, we can just
/// keep smart pointers to data parts instead. That's because the files of a data part are removed only by the destructor of the data part
/// and so keeping a smart pointer to a data part is enough to protect those files from deleting.
bool use_hard_links = !getStorageID().hasUUID();
TableLockHolder table_lock;
for (const auto & part : data_parts)
{
BackupEntries new_backup_entries;
/// Hard links is the default way to ensure that we'll be keeping access to the files of parts.
bool make_temporary_hard_links = true;
bool hold_storage_and_part_ptrs = false;
bool hold_table_lock = false;
if (getStorageID().hasUUID())
{
/// Tables in atomic databases have UUIDs. When using atomic database we don't have to create hard links to make a backup,
/// we can just hold smart pointers to a storage and to data parts instead. That's enough to protect those files from deleting
/// until the backup is done (see the calls `part.unique()` in grabOldParts() and table.unique() in DatabaseCatalog).
make_temporary_hard_links = false;
hold_storage_and_part_ptrs = true;
}
else if (supportsReplication() && part->data_part_storage->supportZeroCopyReplication() && getSettings()->allow_remote_fs_zero_copy_replication)
{
/// Hard links don't work correctly with zero copy replication.
make_temporary_hard_links = false;
hold_storage_and_part_ptrs = true;
hold_table_lock = true;
}
if (hold_table_lock && !table_lock)
table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
BackupEntries backup_entries_from_part;
part->data_part_storage->backup(
part->checksums, part->getFileNamesWithoutChecksums(), data_path_in_backup, new_backup_entries, use_hard_links, &temp_dirs);
part->checksums,
part->getFileNamesWithoutChecksums(),
data_path_in_backup,
backup_entries_from_part,
make_temporary_hard_links,
&temp_dirs);
auto projection_parts = part->getProjectionParts();
for (const auto & [projection_name, projection_part] : projection_parts)
@ -4135,19 +4159,23 @@ BackupEntries MergeTreeData::backupParts(const DataPartsVector & data_parts, con
projection_part->checksums,
projection_part->getFileNamesWithoutChecksums(),
fs::path{data_path_in_backup} / part->name,
new_backup_entries,
use_hard_links,
backup_entries_from_part,
make_temporary_hard_links,
&temp_dirs);
}
if (!use_hard_links)
if (hold_storage_and_part_ptrs)
{
/// Wrap backup entries with data parts in order to keep the data parts alive while the backup entries in use.
for (auto & [_, backup_entry] : new_backup_entries)
backup_entry = std::make_shared<BackupEntryWrappedWith<DataPartPtr>>(std::move(backup_entry), part);
/// Wrap backup entries with smart pointers to data parts and to the storage itself
/// (we'll be holding those smart pointers for as long as we'll be using the backup entries).
auto storage_and_part = std::make_pair(shared_from_this(), part);
if (hold_table_lock)
wrapBackupEntriesWith(backup_entries_from_part, std::make_pair(storage_and_part, table_lock));
else
wrapBackupEntriesWith(backup_entries_from_part, storage_and_part);
}
insertAtEnd(backup_entries, std::move(new_backup_entries));
insertAtEnd(backup_entries, std::move(backup_entries_from_part));
}
return backup_entries;

View File

@ -1243,7 +1243,7 @@ protected:
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
/// Makes backup entries to backup the parts of this table.
BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup) const;
BackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const ContextPtr & local_context);
class RestoredPartsHolder;

View File

@ -1785,7 +1785,7 @@ void StorageMergeTree::backupData(BackupEntriesCollector & backup_entries_collec
for (const auto & data_part : data_parts)
min_data_version = std::min(min_data_version, data_part->info.getDataVersion());
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup));
backup_entries_collector.addBackupEntries(backupParts(data_parts, data_path_in_backup, local_context));
backup_entries_collector.addBackupEntries(backupMutations(min_data_version + 1, data_path_in_backup));
}

View File

@ -8264,7 +8264,7 @@ void StorageReplicatedMergeTree::backupData(
else
data_parts = getVisibleDataPartsVector(local_context);
auto backup_entries = backupParts(data_parts, "");
auto backup_entries = backupParts(data_parts, "", local_context);
auto coordination = backup_entries_collector.getBackupCoordination();
String shared_id = getTableSharedID();

View File

@ -29,7 +29,6 @@ def generate_cluster_def():
main_configs = ["configs/backups_disk.xml", generate_cluster_def()]
user_configs = ["configs/allow_database_types.xml"]
nodes = []
@ -184,6 +183,7 @@ def test_concurrent_backups_on_different_nodes():
def test_create_or_drop_tables_during_backup(db_engine, table_engine):
if db_engine == "Replicated":
db_engine = "Replicated('/clickhouse/path/','{shard}','{replica}')"
if table_engine.endswith("MergeTree"):
table_engine += " ORDER BY tuple()"
@ -219,6 +219,12 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
f"RENAME TABLE {table_name1} TO {table_name2}"
)
def truncate_table():
while time.time() < end_time:
table_name = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
node.query(f"TRUNCATE TABLE IF EXISTS {table_name} NO DELAY")
def make_backup():
ids = []
while time.time() < end_time:
@ -240,6 +246,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
futures.append(executor.submit(create_table))
futures.append(executor.submit(drop_table))
futures.append(executor.submit(rename_table))
futures.append(executor.submit(truncate_table))
for future in futures:
future.result()
ids = ids_future.result()