From 93530e8d3457138cfaa359f8fc175f8c4c87737a Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Mon, 9 Jan 2023 18:14:39 +0100 Subject: [PATCH 1/6] Added settings to disallow concurrent backups and restores Implementation: * Added server level settings to disallow concurrent backups and restores, which are read and set when BackupWorker is created in Context. * Settings are set to true by default. * Before starting backup or restores, added a check to see if any other backups/restores are running (except internal ones). Testing: * Added a test test_backup_and_restore_on_cluster/test_disallow_concurrency. --- src/Backups/BackupsWorker.cpp | 11 +- src/Backups/BackupsWorker.h | 4 +- src/Interpreters/Context.cpp | 5 +- .../configs/disallow_concurrency.xml | 15 ++ .../test_disallow_concurrency.py | 195 ++++++++++++++++++ 5 files changed, 227 insertions(+), 3 deletions(-) create mode 100644 tests/integration/test_backup_restore_on_cluster/configs/disallow_concurrency.xml create mode 100644 tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index 267400ce66d..00106ad0390 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -30,6 +30,7 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int LOGICAL_ERROR; + extern const int CONCURRENT_ACCESS_NOT_SUPPORTED; } using OperationID = BackupsWorker::OperationID; @@ -121,10 +122,12 @@ namespace } -BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threads) +BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_) : backups_thread_pool(num_backup_threads, /* max_free_threads = */ 0, num_backup_threads) , restores_thread_pool(num_restore_threads, /* max_free_threads = */ 0, num_restore_threads) , log(&Poco::Logger::get("BackupsWorker")) + , allow_concurrent_backups(allow_concurrent_backups_) + , allow_concurrent_restores(allow_concurrent_restores_) { /// We set max_free_threads = 0 because we don't want to keep any threads if there is no BACKUP or RESTORE query running right now. } @@ -157,6 +160,9 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context else backup_id = toString(*backup_settings.backup_uuid); + if (!backup_settings.internal && (num_active_backups && !allow_concurrent_backups)) + throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'"); + std::shared_ptr backup_coordination; if (backup_settings.internal) { @@ -379,6 +385,9 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt else restore_id = toString(UUIDHelpers::generateV4()); + if (!restore_settings.internal && (num_active_restores && !allow_concurrent_restores)) + throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'"); + std::shared_ptr restore_coordination; if (restore_settings.internal) { diff --git a/src/Backups/BackupsWorker.h b/src/Backups/BackupsWorker.h index ab99691c0bc..a0a28b89d3a 100644 --- a/src/Backups/BackupsWorker.h +++ b/src/Backups/BackupsWorker.h @@ -23,7 +23,7 @@ class IRestoreCoordination; class BackupsWorker { public: - BackupsWorker(size_t num_backup_threads, size_t num_restore_threads); + BackupsWorker(size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_); /// Waits until all tasks have been completed. void shutdown(); @@ -113,6 +113,8 @@ private: std::atomic num_active_restores = 0; mutable std::mutex infos_mutex; Poco::Logger * log; + const bool allow_concurrent_backups; + const bool allow_concurrent_restores; }; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index f97fd422662..33f36afb440 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1852,8 +1852,11 @@ BackupsWorker & Context::getBackupsWorker() const { auto lock = getLock(); + const bool allow_concurrent_backups = this->getConfigRef().getBool("allow_concurrent_backups", true); + const bool allow_concurrent_restores = this->getConfigRef().getBool("allow_concurrent_restores", true); + if (!shared->backups_worker) - shared->backups_worker.emplace(getSettingsRef().backup_threads, getSettingsRef().restore_threads); + shared->backups_worker.emplace(getSettingsRef().backup_threads, getSettingsRef().restore_threads, allow_concurrent_backups, allow_concurrent_restores); return *shared->backups_worker; } diff --git a/tests/integration/test_backup_restore_on_cluster/configs/disallow_concurrency.xml b/tests/integration/test_backup_restore_on_cluster/configs/disallow_concurrency.xml new file mode 100644 index 00000000000..144be77c9f9 --- /dev/null +++ b/tests/integration/test_backup_restore_on_cluster/configs/disallow_concurrency.xml @@ -0,0 +1,15 @@ + + + + + local + /backups/ + + + + + backups + + false + false + diff --git a/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py new file mode 100644 index 00000000000..dc0ce5ead41 --- /dev/null +++ b/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py @@ -0,0 +1,195 @@ +from random import randint +import pytest +import os.path +import time +import concurrent +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV, assert_eq_with_retry + + +cluster = ClickHouseCluster(__file__) + +num_nodes = 10 + + +def generate_cluster_def(): + path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "./_gen/cluster_for_concurrency_test.xml", + ) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w") as f: + f.write("\n\t\n\t\t\n\t\t\t\n") + for i in range(num_nodes): + f.write( + f"\t\t\t\t\n\t\t\t\t\tnode{i}\n\t\t\t\t\t9000\n\t\t\t\t\n" + ) + f.write("\t\t\t\n\t\t\n\t\n") + return path + + +main_configs = ["configs/disallow_concurrency.xml", generate_cluster_def()] +user_configs = ["configs/allow_database_types.xml"] + +nodes = [] +for i in range(num_nodes): + nodes.append( + cluster.add_instance( + f"node{i}", + main_configs=main_configs, + user_configs=user_configs, + external_dirs=["/backups/"], + macros={"replica": f"node{i}", "shard": "shard1"}, + with_zookeeper=True, + ) + ) + +node0 = nodes[0] + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture(autouse=True) +def drop_after_test(): + try: + yield + finally: + node0.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY") + node0.query("DROP DATABASE IF EXISTS mydb ON CLUSTER 'cluster' NO DELAY") + + +backup_id_counter = 0 + + +def new_backup_name(): + global backup_id_counter + backup_id_counter += 1 + return f"Disk('backups', '{backup_id_counter}')" + + +def create_and_fill_table(): + node0.query( + "CREATE TABLE tbl ON CLUSTER 'cluster' (" + "x UInt64" + ") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')" + "ORDER BY x" + ) + for i in range(num_nodes): + nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(40000000)") + + +# All the tests have concurrent backup/restores with same backup names +# The same works with different backup names too. Since concurrency +# check comes before backup name check, separate tests are not added for different names + +def test_concurrent_backups_on_same_node(): + create_and_fill_table() + + backup_name = new_backup_name() + + id = nodes[0].query( + f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC" + ).split("\t")[0] + assert_eq_with_retry( + nodes[0], + f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'", + "CREATING_BACKUP", + ) + assert "Concurrent backups not supported" in nodes[0].query_and_get_error( + f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}") + + assert_eq_with_retry( + nodes[0], + f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'", + "BACKUP_CREATED", + ) + + # This restore part is added to confirm creating an internal backup & restore work + # even when a concurrent backup is stopped + nodes[0].query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY") + nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}") + nodes[0].query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl") + + +def test_concurrent_backups_on_different_nodes(): + create_and_fill_table() + + backup_name = new_backup_name() + + nodes[1].query( + f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC" + ) + assert_eq_with_retry( + nodes[1], + f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP'", + "CREATING_BACKUP", + ) + assert "Concurrent backups not supported" in nodes[2].query_and_get_error( + f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}") + +def test_concurrent_restores_on_same_node(): + create_and_fill_table() + + backup_name = new_backup_name() + + id = nodes[0].query( + f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC" + ).split("\t")[0] + assert_eq_with_retry( + nodes[0], + f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'", + "CREATING_BACKUP", + ) + + assert_eq_with_retry( + nodes[0], + f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'", + "BACKUP_CREATED", + ) + + nodes[0].query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY") + nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name} ASYNC") + assert_eq_with_retry( + nodes[0], + f"SELECT status FROM system.backups WHERE status == 'RESTORING'", + "RESTORING", + ) + assert "Concurrent restores not supported" in nodes[0].query_and_get_error( + f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}") + +def test_concurrent_restores_on_different_node(): + create_and_fill_table() + + backup_name = new_backup_name() + + id = nodes[0].query( + f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC" + ).split("\t")[0] + assert_eq_with_retry( + nodes[0], + f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'", + "CREATING_BACKUP", + ) + + assert_eq_with_retry( + nodes[0], + f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'", + "BACKUP_CREATED", + ) + + nodes[0].query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY") + nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name} ASYNC") + assert_eq_with_retry( + nodes[0], + f"SELECT status FROM system.backups WHERE status == 'RESTORING'", + "RESTORING", + ) + assert "Concurrent restores not supported" in nodes[1].query_and_get_error( + f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}") \ No newline at end of file From 46b21629edcebd82de250e8a0797c9c8d28a5e8d Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Mon, 9 Jan 2023 17:23:02 +0000 Subject: [PATCH 2/6] Automatic style fix --- .../test_disallow_concurrency.py | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py index dc0ce5ead41..edf92c69171 100644 --- a/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py +++ b/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py @@ -89,21 +89,25 @@ def create_and_fill_table(): # The same works with different backup names too. Since concurrency # check comes before backup name check, separate tests are not added for different names + def test_concurrent_backups_on_same_node(): create_and_fill_table() backup_name = new_backup_name() - id = nodes[0].query( - f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC" - ).split("\t")[0] + id = ( + nodes[0] + .query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC") + .split("\t")[0] + ) assert_eq_with_retry( nodes[0], f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'", "CREATING_BACKUP", ) assert "Concurrent backups not supported" in nodes[0].query_and_get_error( - f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}") + f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}" + ) assert_eq_with_retry( nodes[0], @@ -123,25 +127,27 @@ def test_concurrent_backups_on_different_nodes(): backup_name = new_backup_name() - nodes[1].query( - f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC" - ) + nodes[1].query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC") assert_eq_with_retry( nodes[1], f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP'", "CREATING_BACKUP", ) assert "Concurrent backups not supported" in nodes[2].query_and_get_error( - f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}") + f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}" + ) + def test_concurrent_restores_on_same_node(): create_and_fill_table() backup_name = new_backup_name() - id = nodes[0].query( - f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC" - ).split("\t")[0] + id = ( + nodes[0] + .query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC") + .split("\t")[0] + ) assert_eq_with_retry( nodes[0], f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'", @@ -162,16 +168,20 @@ def test_concurrent_restores_on_same_node(): "RESTORING", ) assert "Concurrent restores not supported" in nodes[0].query_and_get_error( - f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}") + f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}" + ) + def test_concurrent_restores_on_different_node(): create_and_fill_table() backup_name = new_backup_name() - id = nodes[0].query( - f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC" - ).split("\t")[0] + id = ( + nodes[0] + .query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC") + .split("\t")[0] + ) assert_eq_with_retry( nodes[0], f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'", @@ -192,4 +202,5 @@ def test_concurrent_restores_on_different_node(): "RESTORING", ) assert "Concurrent restores not supported" in nodes[1].query_and_get_error( - f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}") \ No newline at end of file + f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}" + ) From 6e06af1b25e8d7cb7ac3a88cb19023f00f9ea4a2 Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Tue, 17 Jan 2023 22:27:13 +0100 Subject: [PATCH 3/6] Updated strategy for handling internal backups & restores to avoid concurrent internal backups & restores - Added settings to disallow concurrent backups and restores --- src/Backups/BackupSettings.cpp | 44 +--------------- src/Backups/BackupsWorker.cpp | 52 +++++++++++++++++-- src/Backups/BackupsWorker.h | 2 + src/Backups/RestoreSettings.cpp | 5 +- src/Backups/RestoreSettings.h | 5 ++ src/Backups/SettingsFieldOptionalUUID.cpp | 43 +++++++++++++++ src/Backups/SettingsFieldOptionalUUID.h | 18 +++++++ .../test_disallow_concurrency.py | 23 ++++++-- 8 files changed, 138 insertions(+), 54 deletions(-) create mode 100644 src/Backups/SettingsFieldOptionalUUID.cpp create mode 100644 src/Backups/SettingsFieldOptionalUUID.h diff --git a/src/Backups/BackupSettings.cpp b/src/Backups/BackupSettings.cpp index 8c54b29141a..747e7bb0c1f 100644 --- a/src/Backups/BackupSettings.cpp +++ b/src/Backups/BackupSettings.cpp @@ -6,7 +6,7 @@ #include #include #include - +#include namespace DB { @@ -16,48 +16,6 @@ namespace ErrorCodes extern const int WRONG_BACKUP_SETTINGS; } - -namespace -{ - struct SettingFieldOptionalUUID - { - std::optional value; - - explicit SettingFieldOptionalUUID(const std::optional & value_) : value(value_) {} - - explicit SettingFieldOptionalUUID(const Field & field) - { - if (field.getType() == Field::Types::Null) - { - value = std::nullopt; - return; - } - - if (field.getType() == Field::Types::String) - { - const String & str = field.get(); - if (str.empty()) - { - value = std::nullopt; - return; - } - - UUID id; - if (tryParse(id, str)) - { - value = id; - return; - } - } - - throw Exception(ErrorCodes::CANNOT_PARSE_BACKUP_SETTINGS, "Cannot parse uuid from {}", field); - } - - explicit operator Field() const { return Field(value ? toString(*value) : ""); } - }; -} - - /// List of backup settings except base_backup_name and cluster_host_ids. #define LIST_OF_BACKUP_SETTINGS(M) \ M(String, id) \ diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index 00106ad0390..ff68c065f50 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -160,8 +160,15 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context else backup_id = toString(*backup_settings.backup_uuid); - if (!backup_settings.internal && (num_active_backups && !allow_concurrent_backups)) - throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'"); + /// Check if there are no concurrent backups + if (num_active_backups && !allow_concurrent_backups) + { + /// If its an internal backup and we currently have 1 active backup, it could be the original query, validate using backup_uuid + if(!(num_active_backups==1 && backup_settings.internal && getAllActiveBackupInfos().at(0).id == toString(*backup_settings.backup_uuid))) + { + throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'"); + } + } std::shared_ptr backup_coordination; if (backup_settings.internal) @@ -376,6 +383,9 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt auto restore_query = std::static_pointer_cast(query->clone()); auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query); + if (!restore_settings.backup_uuid) + restore_settings.backup_uuid = UUIDHelpers::generateV4(); + /// `restore_id` will be used as a key to the `infos` map, so it should be unique. OperationID restore_id; if (restore_settings.internal) @@ -383,10 +393,17 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt else if (!restore_settings.id.empty()) restore_id = restore_settings.id; else - restore_id = toString(UUIDHelpers::generateV4()); + restore_id = toString(*restore_settings.backup_uuid); - if (!restore_settings.internal && (num_active_restores && !allow_concurrent_restores)) - throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'"); + /// Check if there are no concurrent restores + if (num_active_restores && !allow_concurrent_restores) + { + /// If its an internal restore and we currently have 1 active restore, it could be the original query, validate using iz + if(!(num_active_restores==1 && restore_settings.internal && getAllActiveRestoreInfos().at(0).id == toString(*restore_settings.backup_uuid))) + { + throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'"); + } + } std::shared_ptr restore_coordination; if (restore_settings.internal) @@ -480,6 +497,7 @@ void BackupsWorker::doRestore( backup_open_params.context = context; backup_open_params.backup_info = backup_info; backup_open_params.base_backup_info = restore_settings.base_backup_info; + backup_open_params.backup_uuid = restore_settings.backup_uuid; backup_open_params.password = restore_settings.password; BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params); @@ -696,6 +714,30 @@ std::vector BackupsWorker::getAllInfos() const return res_infos; } +std::vector BackupsWorker::getAllActiveBackupInfos() const +{ + std::vector res_infos; + std::lock_guard lock{infos_mutex}; + for (const auto & info : infos | boost::adaptors::map_values) + { + if (info.status==BackupStatus::CREATING_BACKUP) + res_infos.push_back(info); + } + return res_infos; +} + +std::vector BackupsWorker::getAllActiveRestoreInfos() const +{ + std::vector res_infos; + std::lock_guard lock{infos_mutex}; + for (const auto & info : infos | boost::adaptors::map_values) + { + if (info.status==BackupStatus::RESTORING) + res_infos.push_back(info); + } + return res_infos; +} + void BackupsWorker::shutdown() { bool has_active_backups_and_restores = (num_active_backups || num_active_restores); diff --git a/src/Backups/BackupsWorker.h b/src/Backups/BackupsWorker.h index a0a28b89d3a..b6d9729833e 100644 --- a/src/Backups/BackupsWorker.h +++ b/src/Backups/BackupsWorker.h @@ -103,6 +103,8 @@ private: void setStatus(const OperationID & id, BackupStatus status, bool throw_if_error = true); void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); } void setNumFilesAndSize(const OperationID & id, size_t num_files, UInt64 uncompressed_size, UInt64 compressed_size); + std::vector getAllActiveBackupInfos() const; + std::vector getAllActiveRestoreInfos() const; ThreadPool backups_thread_pool; ThreadPool restores_thread_pool; diff --git a/src/Backups/RestoreSettings.cpp b/src/Backups/RestoreSettings.cpp index 2c06ee907b5..0ffa48224f7 100644 --- a/src/Backups/RestoreSettings.cpp +++ b/src/Backups/RestoreSettings.cpp @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB @@ -162,7 +163,9 @@ namespace M(RestoreUDFCreationMode, create_function) \ M(Bool, internal) \ M(String, host_id) \ - M(String, coordination_zk_path) + M(String, coordination_zk_path) \ + M(OptionalUUID, backup_uuid) + RestoreSettings RestoreSettings::fromRestoreQuery(const ASTBackupQuery & query) { diff --git a/src/Backups/RestoreSettings.h b/src/Backups/RestoreSettings.h index 713adbe8029..50058d83a25 100644 --- a/src/Backups/RestoreSettings.h +++ b/src/Backups/RestoreSettings.h @@ -122,6 +122,11 @@ struct RestoreSettings /// Path in Zookeeper used to coordinate restoring process while executing by RESTORE ON CLUSTER. String coordination_zk_path; + /// Internal, should not be specified by user. + /// UUID of the backup. If it's not set it will be generated randomly. + /// This is used to validate internal restores when allow_concurrent_restores is turned off + std::optional backup_uuid; + static RestoreSettings fromRestoreQuery(const ASTBackupQuery & query); void copySettingsToQuery(ASTBackupQuery & query) const; }; diff --git a/src/Backups/SettingsFieldOptionalUUID.cpp b/src/Backups/SettingsFieldOptionalUUID.cpp new file mode 100644 index 00000000000..3f14608b206 --- /dev/null +++ b/src/Backups/SettingsFieldOptionalUUID.cpp @@ -0,0 +1,43 @@ +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_PARSE_BACKUP_SETTINGS; +} + + + SettingFieldOptionalUUID::SettingFieldOptionalUUID(const Field & field) + { + if (field.getType() == Field::Types::Null) + { + value = std::nullopt; + return; + } + + if (field.getType() == Field::Types::String) + { + const String & str = field.get(); + if (str.empty()) + { + value = std::nullopt; + return; + } + + UUID id; + if (tryParse(id, str)) + { + value = id; + return; + } + } + + throw Exception(ErrorCodes::CANNOT_PARSE_BACKUP_SETTINGS, "Cannot parse uuid from {}", field); + } + +} diff --git a/src/Backups/SettingsFieldOptionalUUID.h b/src/Backups/SettingsFieldOptionalUUID.h new file mode 100644 index 00000000000..d4d0a230f66 --- /dev/null +++ b/src/Backups/SettingsFieldOptionalUUID.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +namespace DB +{ +struct SettingFieldOptionalUUID + { + std::optional value; + + explicit SettingFieldOptionalUUID(const std::optional & value_) : value(value_) {} + + explicit SettingFieldOptionalUUID(const Field & field); + + explicit operator Field() const { return Field(value ? toString(*value) : ""); } + }; +} diff --git a/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py index edf92c69171..ac86a484401 100644 --- a/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py +++ b/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py @@ -19,12 +19,25 @@ def generate_cluster_def(): ) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: - f.write("\n\t\n\t\t\n\t\t\t\n") + f.write(""" + + + + + """) for i in range(num_nodes): - f.write( - f"\t\t\t\t\n\t\t\t\t\tnode{i}\n\t\t\t\t\t9000\n\t\t\t\t\n" - ) - f.write("\t\t\t\n\t\t\n\t\n") + f.write(""" + + node"""+str(i)+""" + 9000 + + """) + f.write(""" + + + + + """) return path From 237bb15a9fbe49336e5d7e922706e54c98b27b5f Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Tue, 17 Jan 2023 21:34:16 +0000 Subject: [PATCH 4/6] Automatic style fix --- .../test_disallow_concurrency.py | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py b/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py index ac86a484401..8f514b95d0b 100644 --- a/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py +++ b/tests/integration/test_backup_restore_on_cluster/test_disallow_concurrency.py @@ -19,25 +19,33 @@ def generate_cluster_def(): ) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: - f.write(""" + f.write( + """ - """) + """ + ) for i in range(num_nodes): - f.write(""" + f.write( + """ - node"""+str(i)+""" + node""" + + str(i) + + """ 9000 - """) - f.write(""" + """ + ) + f.write( + """ - """) + """ + ) return path From ee526ce8776f87cc74ffef63b2e046a6102864d2 Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Tue, 17 Jan 2023 22:52:55 +0100 Subject: [PATCH 5/6] Fix style check - Added settings to disallow concurrent backups and restores --- src/Backups/BackupsWorker.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index ff68c065f50..90ded6b91d7 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -164,7 +164,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context if (num_active_backups && !allow_concurrent_backups) { /// If its an internal backup and we currently have 1 active backup, it could be the original query, validate using backup_uuid - if(!(num_active_backups==1 && backup_settings.internal && getAllActiveBackupInfos().at(0).id == toString(*backup_settings.backup_uuid))) + if(!(num_active_backups == 1 && backup_settings.internal && getAllActiveBackupInfos().at(0).id == toString(*backup_settings.backup_uuid))) { throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'"); } @@ -399,7 +399,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt if (num_active_restores && !allow_concurrent_restores) { /// If its an internal restore and we currently have 1 active restore, it could be the original query, validate using iz - if(!(num_active_restores==1 && restore_settings.internal && getAllActiveRestoreInfos().at(0).id == toString(*restore_settings.backup_uuid))) + if(!(num_active_restores == 1 && restore_settings.internal && getAllActiveRestoreInfos().at(0).id == toString(*restore_settings.backup_uuid))) { throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'"); } From d7ca742d9848c915cd23744c9490a6adc64f9e30 Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Wed, 18 Jan 2023 08:59:47 +0100 Subject: [PATCH 6/6] Fixed style check for beginning of if - Added settings to disallow concurrent backups and restores --- src/Backups/BackupsWorker.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp index 90ded6b91d7..53bebaf06d7 100644 --- a/src/Backups/BackupsWorker.cpp +++ b/src/Backups/BackupsWorker.cpp @@ -164,7 +164,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context if (num_active_backups && !allow_concurrent_backups) { /// If its an internal backup and we currently have 1 active backup, it could be the original query, validate using backup_uuid - if(!(num_active_backups == 1 && backup_settings.internal && getAllActiveBackupInfos().at(0).id == toString(*backup_settings.backup_uuid))) + if (!(num_active_backups == 1 && backup_settings.internal && getAllActiveBackupInfos().at(0).id == toString(*backup_settings.backup_uuid))) { throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'"); } @@ -399,7 +399,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt if (num_active_restores && !allow_concurrent_restores) { /// If its an internal restore and we currently have 1 active restore, it could be the original query, validate using iz - if(!(num_active_restores == 1 && restore_settings.internal && getAllActiveRestoreInfos().at(0).id == toString(*restore_settings.backup_uuid))) + if (!(num_active_restores == 1 && restore_settings.internal && getAllActiveRestoreInfos().at(0).id == toString(*restore_settings.backup_uuid))) { throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'"); }