Merge pull request #47586 from ClickHouse/revert-47581-revert-47216-Backup_Restore_concurrency_check_node

Revert "Revert "Backup_Restore_concurrency_check_node""
This commit is contained in:
SmitaRKulkarni 2023-03-23 10:02:00 +01:00 committed by GitHub
commit 04822a63e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 19 deletions

View File

@ -810,9 +810,12 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
if (existing_backup_uuid == toString(backup_uuid))
continue;
const auto status = zk->get(root_zookeeper_path + "/" + existing_backup_path + "/stage");
if (status != Stage::COMPLETED)
return true;
String status;
if (zk->tryGet(root_zookeeper_path + "/" + existing_backup_path + "/stage", status))
{
if (status != Stage::COMPLETED)
return true;
}
}
zk->createIfNotExists(backup_stage_path, "");

View File

@ -9,13 +9,14 @@ from helpers.test_tools import TSV, assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
num_nodes = 10
num_nodes = 4
ddl_task_timeout = 640
def generate_cluster_def():
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/cluster_for_concurrency_test.xml",
"./_gen/cluster_for_disallow_concurrency_test.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
@ -85,7 +86,7 @@ def drop_after_test():
node0.query(
"DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": 360,
"distributed_ddl_task_timeout": ddl_task_timeout,
},
)
@ -100,6 +101,7 @@ def new_backup_name():
def create_and_fill_table():
node0.query("SET mutations_sync=2")
node0.query(
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
"x UInt64"
@ -107,7 +109,10 @@ def create_and_fill_table():
"ORDER BY x"
)
for i in range(num_nodes):
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(40000000)")
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(100000000)")
nodes[i].query(
f"INSERT INTO tbl SELECT number+100000000 FROM numbers(100000000)"
)
# All the tests have concurrent backup/restores with same backup names
@ -138,6 +143,8 @@ def test_concurrent_backups_on_same_node():
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
# This restore part is added to confirm creating an internal backup & restore work
@ -145,7 +152,7 @@ def test_concurrent_backups_on_same_node():
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": 360,
"distributed_ddl_task_timeout": ddl_task_timeout,
},
)
nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
@ -174,6 +181,8 @@ def test_concurrent_backups_on_different_nodes():
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
@ -197,12 +206,14 @@ def test_concurrent_restores_on_same_node():
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": 360,
"distributed_ddl_task_timeout": ddl_task_timeout,
},
)
restore_id = (
@ -226,44 +237,46 @@ def test_concurrent_restores_on_different_node():
backup_name = new_backup_name()
id = (
nodes[0]
nodes[1]
.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC")
.split("\t")[0]
)
assert_eq_with_retry(
nodes[0],
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'",
"CREATING_BACKUP",
)
assert_eq_with_retry(
nodes[0],
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
nodes[0].query(
nodes[1].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": 360,
"distributed_ddl_task_timeout": ddl_task_timeout,
},
)
restore_id = (
nodes[0]
nodes[1]
.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name} ASYNC")
.split("\t")[0]
)
assert_eq_with_retry(
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'RESTORING'",
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'RESTORING' AND id == '{restore_id}'",
"RESTORING",
)
assert "Concurrent restores not supported" in nodes[1].query_and_get_error(
assert "Concurrent restores not supported" in nodes[0].query_and_get_error(
f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}"
)
assert_eq_with_retry(
nodes[0],
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'RESTORED' AND id == '{restore_id}'",
"RESTORED",
)