Add tests for cancelling backup/restore queries.

This commit is contained in:
Vitaly Baranov 2024-01-14 22:13:02 +01:00
parent 3d7a17af08
commit 185f6cc37c
5 changed files with 229 additions and 6 deletions

View File

@ -339,10 +339,11 @@ private:
}; };
BackupsWorker::BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_) BackupsWorker::BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_, bool test_inject_sleep_)
: thread_pools(std::make_unique<ThreadPools>(num_backup_threads, num_restore_threads)) : thread_pools(std::make_unique<ThreadPools>(num_backup_threads, num_restore_threads))
, allow_concurrent_backups(allow_concurrent_backups_) , allow_concurrent_backups(allow_concurrent_backups_)
, allow_concurrent_restores(allow_concurrent_restores_) , allow_concurrent_restores(allow_concurrent_restores_)
, test_inject_sleep(test_inject_sleep_)
, log(&Poco::Logger::get("BackupsWorker")) , log(&Poco::Logger::get("BackupsWorker"))
{ {
backup_log = global_context->getBackupLog(); backup_log = global_context->getBackupLog();
@ -700,9 +701,10 @@ void BackupsWorker::writeBackupEntries(
if (process_list_element) if (process_list_element)
process_list_element->checkTimeLimit(); process_list_element->checkTimeLimit();
sleepForSeconds(5);
backup->writeFile(file_info, std::move(entry)); backup->writeFile(file_info, std::move(entry));
maybeSleepForTesting();
// Update metadata // Update metadata
if (!internal) if (!internal)
{ {
@ -715,7 +717,6 @@ void BackupsWorker::writeBackupEntries(
backup->getCompressedSize(), backup->getCompressedSize(),
0, 0); 0, 0);
} }
} }
catch (...) catch (...)
{ {
@ -1022,6 +1023,9 @@ void BackupsWorker::restoreTablesData(const OperationID & restore_id, BackupPtr
process_list_element->checkTimeLimit(); process_list_element->checkTimeLimit();
std::move(task)(); std::move(task)();
maybeSleepForTesting();
setNumFilesAndSize( setNumFilesAndSize(
restore_id, restore_id,
backup->getNumFiles(), backup->getNumFiles(),
@ -1142,6 +1146,13 @@ void BackupsWorker::setNumFilesAndSize(const OperationID & id, size_t num_files,
} }
void BackupsWorker::maybeSleepForTesting() const
{
if (test_inject_sleep)
sleepForSeconds(1);
}
void BackupsWorker::wait(const OperationID & id, bool rethrow_exception) void BackupsWorker::wait(const OperationID & id, bool rethrow_exception)
{ {
std::unique_lock lock{infos_mutex}; std::unique_lock lock{infos_mutex};

View File

@ -35,7 +35,7 @@ using QueryStatusPtr = std::shared_ptr<QueryStatus>;
class BackupsWorker class BackupsWorker
{ {
public: public:
BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_); BackupsWorker(ContextPtr global_context, size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_, bool test_inject_sleep_);
~BackupsWorker(); ~BackupsWorker();
/// Waits until all tasks have been completed. /// Waits until all tasks have been completed.
@ -95,11 +95,16 @@ private:
enum class ThreadPoolId; enum class ThreadPoolId;
ThreadPool & getThreadPool(ThreadPoolId thread_pool_id); ThreadPool & getThreadPool(ThreadPoolId thread_pool_id);
/// Waits for some time if `test_inject_sleep` is true.
void maybeSleepForTesting() const;
class ThreadPools; class ThreadPools;
std::unique_ptr<ThreadPools> thread_pools; std::unique_ptr<ThreadPools> thread_pools;
const bool allow_concurrent_backups; const bool allow_concurrent_backups;
const bool allow_concurrent_restores; const bool allow_concurrent_restores;
const bool test_inject_sleep;
Poco::Logger * log; Poco::Logger * log;
std::unordered_map<BackupOperationID, BackupOperationInfo> infos; std::unordered_map<BackupOperationID, BackupOperationInfo> infos;

View File

@ -2535,12 +2535,13 @@ BackupsWorker & Context::getBackupsWorker() const
const auto & config = getConfigRef(); const auto & config = getConfigRef();
const bool allow_concurrent_backups = config.getBool("backups.allow_concurrent_backups", true); const bool allow_concurrent_backups = config.getBool("backups.allow_concurrent_backups", true);
const bool allow_concurrent_restores = config.getBool("backups.allow_concurrent_restores", true); const bool allow_concurrent_restores = config.getBool("backups.allow_concurrent_restores", true);
const bool test_inject_sleep = config.getBool("backups.test_inject_sleep", false);
const auto & settings_ref = getSettingsRef(); const auto & settings_ref = getSettingsRef();
UInt64 backup_threads = config.getUInt64("backup_threads", settings_ref.backup_threads); UInt64 backup_threads = config.getUInt64("backup_threads", settings_ref.backup_threads);
UInt64 restore_threads = config.getUInt64("restore_threads", settings_ref.restore_threads); UInt64 restore_threads = config.getUInt64("restore_threads", settings_ref.restore_threads);
shared->backups_worker.emplace(getGlobalContext(), backup_threads, restore_threads, allow_concurrent_backups, allow_concurrent_restores); shared->backups_worker.emplace(getGlobalContext(), backup_threads, restore_threads, allow_concurrent_backups, allow_concurrent_restores, test_inject_sleep);
}); });
return *shared->backups_worker; return *shared->backups_worker;

View File

@ -0,0 +1,7 @@
<clickhouse>
<backups>
<test_inject_sleep>true</test_inject_sleep>
</backups>
<backup_threads>2</backup_threads>
<restore_threads>2</restore_threads>
</clickhouse>

View File

@ -0,0 +1,199 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV, assert_eq_with_retry
import uuid
cluster = ClickHouseCluster(__file__)
main_configs = [
"configs/backups_disk.xml",
"configs/slow_backups.xml",
]
node = cluster.add_instance(
"node",
main_configs=main_configs,
external_dirs=["/backups/"],
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def drop_after_test():
try:
yield
finally:
node.query("DROP TABLE IF EXISTS tbl SYNC")
# Generate the backup name.
def get_backup_name(backup_id):
return f"Disk('backups', '{backup_id}')"
# Start making a backup asynchronously.
def start_backup(backup_id):
node.query(
f"BACKUP TABLE tbl TO {get_backup_name(backup_id)} SETTINGS id='{backup_id}' ASYNC"
)
assert (
node.query(f"SELECT status FROM system.backups WHERE id='{backup_id}'")
== "CREATING_BACKUP\n"
)
assert (
node.query(
f"SELECT count() FROM system.processes WHERE query_kind='Backup' AND query LIKE '%{backup_id}%'"
)
== "1\n"
)
# Wait for the backup to be completed.
def wait_backup(backup_id):
assert_eq_with_retry(
node,
f"SELECT status FROM system.backups WHERE id='{backup_id}'",
"BACKUP_CREATED",
retry_count=60,
sleep_time=5,
)
backup_duration = int(
node.query(
f"SELECT end_time - start_time FROM system.backups WHERE id='{backup_id}'"
)
)
assert backup_duration >= 3 # Backup is not expected to be too quick in this test.
# Cancel the specified backup.
def cancel_backup(backup_id):
node.query(
f"KILL QUERY WHERE query_kind='Backup' AND query LIKE '%{backup_id}%' SYNC"
)
assert (
node.query(f"SELECT status FROM system.backups WHERE id='{backup_id}'")
== "BACKUP_FAILED\n"
)
expected_error = "QUERY_WAS_CANCELLED"
assert expected_error in node.query(
f"SELECT error FROM system.backups WHERE id='{backup_id}'"
)
assert (
node.query(
f"SELECT count() FROM system.processes WHERE query_kind='Backup' AND query LIKE '%{backup_id}%'"
)
== "0\n"
)
node.query("SYSTEM FLUSH LOGS")
kill_duration_ms = int(
node.query(
f"SELECT query_duration_ms FROM system.query_log WHERE query_kind='KillQuery' AND query LIKE '%{backup_id}%' AND type='QueryFinish'"
)
)
assert kill_duration_ms < 2000 # Query must be cancelled quickly
# Start restoring from a backup.
def start_restore(restore_id, backup_id):
node.query(
f"RESTORE TABLE tbl FROM {get_backup_name(backup_id)} SETTINGS id='{restore_id}' ASYNC"
)
assert (
node.query(f"SELECT status FROM system.backups WHERE id='{restore_id}'")
== "RESTORING\n"
)
assert (
node.query(
f"SELECT count() FROM system.processes WHERE query_kind='Restore' AND query LIKE '%{restore_id}%'"
)
== "1\n"
)
# Wait for the restore operation to be completed.
def wait_restore(restore_id):
assert_eq_with_retry(
node,
f"SELECT status FROM system.backups WHERE id='{restore_id}'",
"RESTORED",
retry_count=60,
sleep_time=5,
)
restore_duration = int(
node.query(
f"SELECT end_time - start_time FROM system.backups WHERE id='{restore_id}'"
)
)
assert (
restore_duration >= 3
) # Restore is not expected to be too quick in this test.
# Cancel the specified restore operation.
def cancel_restore(restore_id):
node.query(
f"KILL QUERY WHERE query_kind='Restore' AND query LIKE '%{restore_id}%' SYNC"
)
assert (
node.query(f"SELECT status FROM system.backups WHERE id='{restore_id}'")
== "RESTORE_FAILED\n"
)
expected_error = "QUERY_WAS_CANCELLED"
assert expected_error in node.query(
f"SELECT error FROM system.backups WHERE id='{restore_id}'"
)
assert (
node.query(
f"SELECT count() FROM system.processes WHERE query_kind='Restore' AND query LIKE '%{restore_id}%'"
)
== "0\n"
)
node.query("SYSTEM FLUSH LOGS")
kill_duration_ms = int(
node.query(
f"SELECT query_duration_ms FROM system.query_log WHERE query_kind='KillQuery' AND query LIKE '%{restore_id}%' AND type='QueryFinish'"
)
)
assert kill_duration_ms < 2000 # Query must be cancelled quickly
def test_cancel_backup():
# We use partitioning so backups would contain more files.
node.query(
"CREATE TABLE tbl (x UInt64) ENGINE=MergeTree() ORDER BY tuple() PARTITION BY x%5"
)
node.query(f"INSERT INTO tbl SELECT number FROM numbers(500)")
try_backup_id_1 = uuid.uuid4().hex
start_backup(try_backup_id_1)
cancel_backup(try_backup_id_1)
backup_id = uuid.uuid4().hex
start_backup(backup_id)
wait_backup(backup_id)
node.query(f"DROP TABLE tbl SYNC")
try_restore_id_1 = uuid.uuid4().hex
start_restore(try_restore_id_1, backup_id)
cancel_restore(try_restore_id_1)
node.query(f"DROP TABLE tbl SYNC")
restore_id = uuid.uuid4().hex
start_restore(restore_id, backup_id)
wait_restore(restore_id)