mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #45072 from ClickHouse/43891_Disallow_concurrent_backups_and_restores
Added settings to disallow concurrent backups and restores
This commit is contained in:
commit
6aa63414db
@ -6,7 +6,7 @@
|
|||||||
#include <Parsers/ASTSetQuery.h>
|
#include <Parsers/ASTSetQuery.h>
|
||||||
#include <Parsers/ASTLiteral.h>
|
#include <Parsers/ASTLiteral.h>
|
||||||
#include <IO/ReadHelpers.h>
|
#include <IO/ReadHelpers.h>
|
||||||
|
#include <Backups/SettingsFieldOptionalUUID.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -16,48 +16,6 @@ namespace ErrorCodes
|
|||||||
extern const int WRONG_BACKUP_SETTINGS;
|
extern const int WRONG_BACKUP_SETTINGS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
struct SettingFieldOptionalUUID
|
|
||||||
{
|
|
||||||
std::optional<UUID> value;
|
|
||||||
|
|
||||||
explicit SettingFieldOptionalUUID(const std::optional<UUID> & value_) : value(value_) {}
|
|
||||||
|
|
||||||
explicit SettingFieldOptionalUUID(const Field & field)
|
|
||||||
{
|
|
||||||
if (field.getType() == Field::Types::Null)
|
|
||||||
{
|
|
||||||
value = std::nullopt;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (field.getType() == Field::Types::String)
|
|
||||||
{
|
|
||||||
const String & str = field.get<const String &>();
|
|
||||||
if (str.empty())
|
|
||||||
{
|
|
||||||
value = std::nullopt;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
UUID id;
|
|
||||||
if (tryParse(id, str))
|
|
||||||
{
|
|
||||||
value = id;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throw Exception(ErrorCodes::CANNOT_PARSE_BACKUP_SETTINGS, "Cannot parse uuid from {}", field);
|
|
||||||
}
|
|
||||||
|
|
||||||
explicit operator Field() const { return Field(value ? toString(*value) : ""); }
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// List of backup settings except base_backup_name and cluster_host_ids.
|
/// List of backup settings except base_backup_name and cluster_host_ids.
|
||||||
#define LIST_OF_BACKUP_SETTINGS(M) \
|
#define LIST_OF_BACKUP_SETTINGS(M) \
|
||||||
M(String, id) \
|
M(String, id) \
|
||||||
|
@ -30,6 +30,7 @@ namespace ErrorCodes
|
|||||||
{
|
{
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
|
extern const int CONCURRENT_ACCESS_NOT_SUPPORTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
using OperationID = BackupsWorker::OperationID;
|
using OperationID = BackupsWorker::OperationID;
|
||||||
@ -121,10 +122,12 @@ namespace
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threads)
|
BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_)
|
||||||
: backups_thread_pool(num_backup_threads, /* max_free_threads = */ 0, num_backup_threads)
|
: backups_thread_pool(num_backup_threads, /* max_free_threads = */ 0, num_backup_threads)
|
||||||
, restores_thread_pool(num_restore_threads, /* max_free_threads = */ 0, num_restore_threads)
|
, restores_thread_pool(num_restore_threads, /* max_free_threads = */ 0, num_restore_threads)
|
||||||
, log(&Poco::Logger::get("BackupsWorker"))
|
, log(&Poco::Logger::get("BackupsWorker"))
|
||||||
|
, allow_concurrent_backups(allow_concurrent_backups_)
|
||||||
|
, allow_concurrent_restores(allow_concurrent_restores_)
|
||||||
{
|
{
|
||||||
/// We set max_free_threads = 0 because we don't want to keep any threads if there is no BACKUP or RESTORE query running right now.
|
/// We set max_free_threads = 0 because we don't want to keep any threads if there is no BACKUP or RESTORE query running right now.
|
||||||
}
|
}
|
||||||
@ -157,6 +160,16 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
|
|||||||
else
|
else
|
||||||
backup_id = toString(*backup_settings.backup_uuid);
|
backup_id = toString(*backup_settings.backup_uuid);
|
||||||
|
|
||||||
|
/// Check if there are no concurrent backups
|
||||||
|
if (num_active_backups && !allow_concurrent_backups)
|
||||||
|
{
|
||||||
|
/// If its an internal backup and we currently have 1 active backup, it could be the original query, validate using backup_uuid
|
||||||
|
if (!(num_active_backups == 1 && backup_settings.internal && getAllActiveBackupInfos().at(0).id == toString(*backup_settings.backup_uuid)))
|
||||||
|
{
|
||||||
|
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent backups not supported, turn on setting 'allow_concurrent_backups'");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::shared_ptr<IBackupCoordination> backup_coordination;
|
std::shared_ptr<IBackupCoordination> backup_coordination;
|
||||||
if (backup_settings.internal)
|
if (backup_settings.internal)
|
||||||
{
|
{
|
||||||
@ -370,6 +383,9 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
|
|||||||
auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
|
auto restore_query = std::static_pointer_cast<ASTBackupQuery>(query->clone());
|
||||||
auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query);
|
auto restore_settings = RestoreSettings::fromRestoreQuery(*restore_query);
|
||||||
|
|
||||||
|
if (!restore_settings.backup_uuid)
|
||||||
|
restore_settings.backup_uuid = UUIDHelpers::generateV4();
|
||||||
|
|
||||||
/// `restore_id` will be used as a key to the `infos` map, so it should be unique.
|
/// `restore_id` will be used as a key to the `infos` map, so it should be unique.
|
||||||
OperationID restore_id;
|
OperationID restore_id;
|
||||||
if (restore_settings.internal)
|
if (restore_settings.internal)
|
||||||
@ -377,7 +393,17 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
|
|||||||
else if (!restore_settings.id.empty())
|
else if (!restore_settings.id.empty())
|
||||||
restore_id = restore_settings.id;
|
restore_id = restore_settings.id;
|
||||||
else
|
else
|
||||||
restore_id = toString(UUIDHelpers::generateV4());
|
restore_id = toString(*restore_settings.backup_uuid);
|
||||||
|
|
||||||
|
/// Check if there are no concurrent restores
|
||||||
|
if (num_active_restores && !allow_concurrent_restores)
|
||||||
|
{
|
||||||
|
/// If its an internal restore and we currently have 1 active restore, it could be the original query, validate using iz
|
||||||
|
if (!(num_active_restores == 1 && restore_settings.internal && getAllActiveRestoreInfos().at(0).id == toString(*restore_settings.backup_uuid)))
|
||||||
|
{
|
||||||
|
throw Exception(ErrorCodes::CONCURRENT_ACCESS_NOT_SUPPORTED, "Concurrent restores not supported, turn on setting 'allow_concurrent_restores'");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::shared_ptr<IRestoreCoordination> restore_coordination;
|
std::shared_ptr<IRestoreCoordination> restore_coordination;
|
||||||
if (restore_settings.internal)
|
if (restore_settings.internal)
|
||||||
@ -471,6 +497,7 @@ void BackupsWorker::doRestore(
|
|||||||
backup_open_params.context = context;
|
backup_open_params.context = context;
|
||||||
backup_open_params.backup_info = backup_info;
|
backup_open_params.backup_info = backup_info;
|
||||||
backup_open_params.base_backup_info = restore_settings.base_backup_info;
|
backup_open_params.base_backup_info = restore_settings.base_backup_info;
|
||||||
|
backup_open_params.backup_uuid = restore_settings.backup_uuid;
|
||||||
backup_open_params.password = restore_settings.password;
|
backup_open_params.password = restore_settings.password;
|
||||||
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
|
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
|
||||||
|
|
||||||
@ -687,6 +714,30 @@ std::vector<BackupsWorker::Info> BackupsWorker::getAllInfos() const
|
|||||||
return res_infos;
|
return res_infos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::vector<BackupsWorker::Info> BackupsWorker::getAllActiveBackupInfos() const
|
||||||
|
{
|
||||||
|
std::vector<Info> res_infos;
|
||||||
|
std::lock_guard lock{infos_mutex};
|
||||||
|
for (const auto & info : infos | boost::adaptors::map_values)
|
||||||
|
{
|
||||||
|
if (info.status==BackupStatus::CREATING_BACKUP)
|
||||||
|
res_infos.push_back(info);
|
||||||
|
}
|
||||||
|
return res_infos;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<BackupsWorker::Info> BackupsWorker::getAllActiveRestoreInfos() const
|
||||||
|
{
|
||||||
|
std::vector<Info> res_infos;
|
||||||
|
std::lock_guard lock{infos_mutex};
|
||||||
|
for (const auto & info : infos | boost::adaptors::map_values)
|
||||||
|
{
|
||||||
|
if (info.status==BackupStatus::RESTORING)
|
||||||
|
res_infos.push_back(info);
|
||||||
|
}
|
||||||
|
return res_infos;
|
||||||
|
}
|
||||||
|
|
||||||
void BackupsWorker::shutdown()
|
void BackupsWorker::shutdown()
|
||||||
{
|
{
|
||||||
bool has_active_backups_and_restores = (num_active_backups || num_active_restores);
|
bool has_active_backups_and_restores = (num_active_backups || num_active_restores);
|
||||||
|
@ -23,7 +23,7 @@ class IRestoreCoordination;
|
|||||||
class BackupsWorker
|
class BackupsWorker
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
BackupsWorker(size_t num_backup_threads, size_t num_restore_threads);
|
BackupsWorker(size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_);
|
||||||
|
|
||||||
/// Waits until all tasks have been completed.
|
/// Waits until all tasks have been completed.
|
||||||
void shutdown();
|
void shutdown();
|
||||||
@ -103,6 +103,8 @@ private:
|
|||||||
void setStatus(const OperationID & id, BackupStatus status, bool throw_if_error = true);
|
void setStatus(const OperationID & id, BackupStatus status, bool throw_if_error = true);
|
||||||
void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); }
|
void setStatusSafe(const String & id, BackupStatus status) { setStatus(id, status, false); }
|
||||||
void setNumFilesAndSize(const OperationID & id, size_t num_files, UInt64 uncompressed_size, UInt64 compressed_size);
|
void setNumFilesAndSize(const OperationID & id, size_t num_files, UInt64 uncompressed_size, UInt64 compressed_size);
|
||||||
|
std::vector<Info> getAllActiveBackupInfos() const;
|
||||||
|
std::vector<Info> getAllActiveRestoreInfos() const;
|
||||||
|
|
||||||
ThreadPool backups_thread_pool;
|
ThreadPool backups_thread_pool;
|
||||||
ThreadPool restores_thread_pool;
|
ThreadPool restores_thread_pool;
|
||||||
@ -113,6 +115,8 @@ private:
|
|||||||
std::atomic<size_t> num_active_restores = 0;
|
std::atomic<size_t> num_active_restores = 0;
|
||||||
mutable std::mutex infos_mutex;
|
mutable std::mutex infos_mutex;
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
|
const bool allow_concurrent_backups;
|
||||||
|
const bool allow_concurrent_restores;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
#include <Parsers/ASTSetQuery.h>
|
#include <Parsers/ASTSetQuery.h>
|
||||||
#include <boost/algorithm/string/predicate.hpp>
|
#include <boost/algorithm/string/predicate.hpp>
|
||||||
#include <Common/FieldVisitorConvertToNumber.h>
|
#include <Common/FieldVisitorConvertToNumber.h>
|
||||||
|
#include <Backups/SettingsFieldOptionalUUID.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -162,7 +163,9 @@ namespace
|
|||||||
M(RestoreUDFCreationMode, create_function) \
|
M(RestoreUDFCreationMode, create_function) \
|
||||||
M(Bool, internal) \
|
M(Bool, internal) \
|
||||||
M(String, host_id) \
|
M(String, host_id) \
|
||||||
M(String, coordination_zk_path)
|
M(String, coordination_zk_path) \
|
||||||
|
M(OptionalUUID, backup_uuid)
|
||||||
|
|
||||||
|
|
||||||
RestoreSettings RestoreSettings::fromRestoreQuery(const ASTBackupQuery & query)
|
RestoreSettings RestoreSettings::fromRestoreQuery(const ASTBackupQuery & query)
|
||||||
{
|
{
|
||||||
|
@ -122,6 +122,11 @@ struct RestoreSettings
|
|||||||
/// Path in Zookeeper used to coordinate restoring process while executing by RESTORE ON CLUSTER.
|
/// Path in Zookeeper used to coordinate restoring process while executing by RESTORE ON CLUSTER.
|
||||||
String coordination_zk_path;
|
String coordination_zk_path;
|
||||||
|
|
||||||
|
/// Internal, should not be specified by user.
|
||||||
|
/// UUID of the backup. If it's not set it will be generated randomly.
|
||||||
|
/// This is used to validate internal restores when allow_concurrent_restores is turned off
|
||||||
|
std::optional<UUID> backup_uuid;
|
||||||
|
|
||||||
static RestoreSettings fromRestoreQuery(const ASTBackupQuery & query);
|
static RestoreSettings fromRestoreQuery(const ASTBackupQuery & query);
|
||||||
void copySettingsToQuery(ASTBackupQuery & query) const;
|
void copySettingsToQuery(ASTBackupQuery & query) const;
|
||||||
};
|
};
|
||||||
|
43
src/Backups/SettingsFieldOptionalUUID.cpp
Normal file
43
src/Backups/SettingsFieldOptionalUUID.cpp
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
#include <Backups/SettingsFieldOptionalUUID.h>
|
||||||
|
#include <Common/ErrorCodes.h>
|
||||||
|
#include <Core/SettingsFields.h>
|
||||||
|
#include <IO/ReadHelpers.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int CANNOT_PARSE_BACKUP_SETTINGS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
SettingFieldOptionalUUID::SettingFieldOptionalUUID(const Field & field)
|
||||||
|
{
|
||||||
|
if (field.getType() == Field::Types::Null)
|
||||||
|
{
|
||||||
|
value = std::nullopt;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (field.getType() == Field::Types::String)
|
||||||
|
{
|
||||||
|
const String & str = field.get<const String &>();
|
||||||
|
if (str.empty())
|
||||||
|
{
|
||||||
|
value = std::nullopt;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
UUID id;
|
||||||
|
if (tryParse(id, str))
|
||||||
|
{
|
||||||
|
value = id;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw Exception(ErrorCodes::CANNOT_PARSE_BACKUP_SETTINGS, "Cannot parse uuid from {}", field);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
18
src/Backups/SettingsFieldOptionalUUID.h
Normal file
18
src/Backups/SettingsFieldOptionalUUID.h
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <optional>
|
||||||
|
#include <Core/SettingsFields.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
struct SettingFieldOptionalUUID
|
||||||
|
{
|
||||||
|
std::optional<UUID> value;
|
||||||
|
|
||||||
|
explicit SettingFieldOptionalUUID(const std::optional<UUID> & value_) : value(value_) {}
|
||||||
|
|
||||||
|
explicit SettingFieldOptionalUUID(const Field & field);
|
||||||
|
|
||||||
|
explicit operator Field() const { return Field(value ? toString(*value) : ""); }
|
||||||
|
};
|
||||||
|
}
|
@ -1888,8 +1888,11 @@ BackupsWorker & Context::getBackupsWorker() const
|
|||||||
{
|
{
|
||||||
auto lock = getLock();
|
auto lock = getLock();
|
||||||
|
|
||||||
|
const bool allow_concurrent_backups = this->getConfigRef().getBool("allow_concurrent_backups", true);
|
||||||
|
const bool allow_concurrent_restores = this->getConfigRef().getBool("allow_concurrent_restores", true);
|
||||||
|
|
||||||
if (!shared->backups_worker)
|
if (!shared->backups_worker)
|
||||||
shared->backups_worker.emplace(getSettingsRef().backup_threads, getSettingsRef().restore_threads);
|
shared->backups_worker.emplace(getSettingsRef().backup_threads, getSettingsRef().restore_threads, allow_concurrent_backups, allow_concurrent_restores);
|
||||||
|
|
||||||
return *shared->backups_worker;
|
return *shared->backups_worker;
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
<clickhouse>
|
||||||
|
<storage_configuration>
|
||||||
|
<disks>
|
||||||
|
<backups>
|
||||||
|
<type>local</type>
|
||||||
|
<path>/backups/</path>
|
||||||
|
</backups>
|
||||||
|
</disks>
|
||||||
|
</storage_configuration>
|
||||||
|
<backups>
|
||||||
|
<allowed_disk>backups</allowed_disk>
|
||||||
|
</backups>
|
||||||
|
<allow_concurrent_backups>false</allow_concurrent_backups>
|
||||||
|
<allow_concurrent_restores>false</allow_concurrent_restores>
|
||||||
|
</clickhouse>
|
@ -0,0 +1,227 @@
|
|||||||
|
from random import randint
|
||||||
|
import pytest
|
||||||
|
import os.path
|
||||||
|
import time
|
||||||
|
import concurrent
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
from helpers.test_tools import TSV, assert_eq_with_retry
|
||||||
|
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
|
||||||
|
num_nodes = 10
|
||||||
|
|
||||||
|
|
||||||
|
def generate_cluster_def():
|
||||||
|
path = os.path.join(
|
||||||
|
os.path.dirname(os.path.realpath(__file__)),
|
||||||
|
"./_gen/cluster_for_concurrency_test.xml",
|
||||||
|
)
|
||||||
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||||
|
with open(path, "w") as f:
|
||||||
|
f.write(
|
||||||
|
"""
|
||||||
|
<clickhouse>
|
||||||
|
<remote_servers>
|
||||||
|
<cluster>
|
||||||
|
<shard>
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
for i in range(num_nodes):
|
||||||
|
f.write(
|
||||||
|
"""
|
||||||
|
<replica>
|
||||||
|
<host>node"""
|
||||||
|
+ str(i)
|
||||||
|
+ """</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
f.write(
|
||||||
|
"""
|
||||||
|
</shard>
|
||||||
|
</cluster>
|
||||||
|
</remote_servers>
|
||||||
|
</clickhouse>
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
main_configs = ["configs/disallow_concurrency.xml", generate_cluster_def()]
|
||||||
|
user_configs = ["configs/allow_database_types.xml"]
|
||||||
|
|
||||||
|
nodes = []
|
||||||
|
for i in range(num_nodes):
|
||||||
|
nodes.append(
|
||||||
|
cluster.add_instance(
|
||||||
|
f"node{i}",
|
||||||
|
main_configs=main_configs,
|
||||||
|
user_configs=user_configs,
|
||||||
|
external_dirs=["/backups/"],
|
||||||
|
macros={"replica": f"node{i}", "shard": "shard1"},
|
||||||
|
with_zookeeper=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
node0 = nodes[0]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module", autouse=True)
|
||||||
|
def start_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def drop_after_test():
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
node0.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY")
|
||||||
|
node0.query("DROP DATABASE IF EXISTS mydb ON CLUSTER 'cluster' NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
|
backup_id_counter = 0
|
||||||
|
|
||||||
|
|
||||||
|
def new_backup_name():
|
||||||
|
global backup_id_counter
|
||||||
|
backup_id_counter += 1
|
||||||
|
return f"Disk('backups', '{backup_id_counter}')"
|
||||||
|
|
||||||
|
|
||||||
|
def create_and_fill_table():
|
||||||
|
node0.query(
|
||||||
|
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
|
||||||
|
"x UInt64"
|
||||||
|
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
|
||||||
|
"ORDER BY x"
|
||||||
|
)
|
||||||
|
for i in range(num_nodes):
|
||||||
|
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(40000000)")
|
||||||
|
|
||||||
|
|
||||||
|
# All the tests have concurrent backup/restores with same backup names
|
||||||
|
# The same works with different backup names too. Since concurrency
|
||||||
|
# check comes before backup name check, separate tests are not added for different names
|
||||||
|
|
||||||
|
|
||||||
|
def test_concurrent_backups_on_same_node():
|
||||||
|
create_and_fill_table()
|
||||||
|
|
||||||
|
backup_name = new_backup_name()
|
||||||
|
|
||||||
|
id = (
|
||||||
|
nodes[0]
|
||||||
|
.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC")
|
||||||
|
.split("\t")[0]
|
||||||
|
)
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[0],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'",
|
||||||
|
"CREATING_BACKUP",
|
||||||
|
)
|
||||||
|
assert "Concurrent backups not supported" in nodes[0].query_and_get_error(
|
||||||
|
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[0],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
|
||||||
|
"BACKUP_CREATED",
|
||||||
|
)
|
||||||
|
|
||||||
|
# This restore part is added to confirm creating an internal backup & restore work
|
||||||
|
# even when a concurrent backup is stopped
|
||||||
|
nodes[0].query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
|
||||||
|
nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
|
||||||
|
nodes[0].query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
|
||||||
|
|
||||||
|
|
||||||
|
def test_concurrent_backups_on_different_nodes():
|
||||||
|
create_and_fill_table()
|
||||||
|
|
||||||
|
backup_name = new_backup_name()
|
||||||
|
|
||||||
|
nodes[1].query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC")
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[1],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP'",
|
||||||
|
"CREATING_BACKUP",
|
||||||
|
)
|
||||||
|
assert "Concurrent backups not supported" in nodes[2].query_and_get_error(
|
||||||
|
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_concurrent_restores_on_same_node():
|
||||||
|
create_and_fill_table()
|
||||||
|
|
||||||
|
backup_name = new_backup_name()
|
||||||
|
|
||||||
|
id = (
|
||||||
|
nodes[0]
|
||||||
|
.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC")
|
||||||
|
.split("\t")[0]
|
||||||
|
)
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[0],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'",
|
||||||
|
"CREATING_BACKUP",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[0],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
|
||||||
|
"BACKUP_CREATED",
|
||||||
|
)
|
||||||
|
|
||||||
|
nodes[0].query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
|
||||||
|
nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name} ASYNC")
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[0],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'RESTORING'",
|
||||||
|
"RESTORING",
|
||||||
|
)
|
||||||
|
assert "Concurrent restores not supported" in nodes[0].query_and_get_error(
|
||||||
|
f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_concurrent_restores_on_different_node():
|
||||||
|
create_and_fill_table()
|
||||||
|
|
||||||
|
backup_name = new_backup_name()
|
||||||
|
|
||||||
|
id = (
|
||||||
|
nodes[0]
|
||||||
|
.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC")
|
||||||
|
.split("\t")[0]
|
||||||
|
)
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[0],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'",
|
||||||
|
"CREATING_BACKUP",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[0],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
|
||||||
|
"BACKUP_CREATED",
|
||||||
|
)
|
||||||
|
|
||||||
|
nodes[0].query(f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
|
||||||
|
nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name} ASYNC")
|
||||||
|
assert_eq_with_retry(
|
||||||
|
nodes[0],
|
||||||
|
f"SELECT status FROM system.backups WHERE status == 'RESTORING'",
|
||||||
|
"RESTORING",
|
||||||
|
)
|
||||||
|
assert "Concurrent restores not supported" in nodes[1].query_and_get_error(
|
||||||
|
f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}"
|
||||||
|
)
|
Loading…
Reference in New Issue
Block a user