diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 0bebf91df97..ed960528abe 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -999,6 +999,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio if (metadata_disk->exists(to_path)) metadata_disk->removeRecursive(to_path); + createDirectories(directoryPath(to_path)); metadata_disk->moveDirectory(from_path, to_path); } } diff --git a/tests/integration/test_merge_tree_s3_restore/test.py b/tests/integration/test_merge_tree_s3_restore/test.py index e12b69cdf17..acbcd8c04cf 100644 --- a/tests/integration/test_merge_tree_s3_restore/test.py +++ b/tests/integration/test_merge_tree_s3_restore/test.py @@ -63,11 +63,11 @@ def generate_values(date_str, count, sign=1): return ",".join(["('{}',{},'{}',{})".format(x, y, z, 0) for x, y, z in data]) -def create_table(node, table_name, attach=False, replicated=False): - node.query("CREATE DATABASE IF NOT EXISTS s3 ENGINE = Ordinary") +def create_table(node, table_name, attach=False, replicated=False, db_atomic=False, uuid=""): + node.query("CREATE DATABASE IF NOT EXISTS s3 ENGINE = {engine}".format(engine="Atomic" if db_atomic else "Ordinary")) create_table_statement = """ - {create} TABLE s3.{table_name} {on_cluster} ( + {create} TABLE s3.{table_name} {uuid} {on_cluster} ( dt Date, id Int64, data String, @@ -81,9 +81,10 @@ def create_table(node, table_name, attach=False, replicated=False): old_parts_lifetime=600, index_granularity=512 """.format(create="ATTACH" if attach else "CREATE", - table_name=table_name, - on_cluster="ON CLUSTER '{}'".format(node.name) if replicated else "", - engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/test', '{replica}')" if replicated else "MergeTree()") + table_name=table_name, + uuid="UUID '{uuid}'".format(uuid=uuid) if db_atomic and uuid else "", + on_cluster="ON CLUSTER '{}'".format(node.name) if replicated else "", + engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/test', '{replica}')" if replicated else "MergeTree()") node.query(create_table_statement) @@ -124,6 +125,13 @@ def get_revision_counter(node, backup_number): ['bash', '-c', 'cat /var/lib/clickhouse/disks/s3/shadow/{}/revision.txt'.format(backup_number)], user='root')) +def get_table_uuid(node, db_atomic, table): + uuid = "" + if db_atomic: + uuid = node.query("SELECT uuid FROM system.tables WHERE database='s3' AND table='{}' FORMAT TabSeparated".format(table)).strip() + return uuid + + @pytest.fixture(autouse=True) def drop_table(cluster): yield @@ -146,10 +154,13 @@ def drop_table(cluster): @pytest.mark.parametrize( "replicated", [False, True] ) -def test_full_restore(cluster, replicated): +@pytest.mark.parametrize( + "db_atomic", [False, True] +) +def test_full_restore(cluster, replicated, db_atomic): node = cluster.instances["node"] - create_table(node, "test", attach=False, replicated=replicated) + create_table(node, "test", attach=False, replicated=replicated, db_atomic=db_atomic) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) @@ -166,10 +177,14 @@ def test_full_restore(cluster, replicated): assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) -def test_restore_another_bucket_path(cluster): +@pytest.mark.parametrize( + "db_atomic", [False, True] +) +def test_restore_another_bucket_path(cluster, db_atomic): node = cluster.instances["node"] - create_table(node, "test") + create_table(node, "test", db_atomic=db_atomic) + uuid = get_table_uuid(node, db_atomic, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) @@ -186,7 +201,7 @@ def test_restore_another_bucket_path(cluster): create_restore_file(node_another_bucket, bucket="root") node_another_bucket.query("SYSTEM RESTART DISK s3") - create_table(node_another_bucket, "test", attach=True) + create_table(node_another_bucket, "test", attach=True, db_atomic=db_atomic, uuid=uuid) assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -195,16 +210,20 @@ def test_restore_another_bucket_path(cluster): create_restore_file(node_another_bucket_path, bucket="root2", path="data") node_another_bucket_path.query("SYSTEM RESTART DISK s3") - create_table(node_another_bucket_path, "test", attach=True) + create_table(node_another_bucket_path, "test", attach=True, db_atomic=db_atomic, uuid=uuid) assert node_another_bucket_path.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4) assert node_another_bucket_path.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) -def test_restore_different_revisions(cluster): +@pytest.mark.parametrize( + "db_atomic", [False, True] +) +def test_restore_different_revisions(cluster, db_atomic): node = cluster.instances["node"] - create_table(node, "test") + create_table(node, "test", db_atomic=db_atomic) + uuid = get_table_uuid(node, db_atomic, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) @@ -233,7 +252,7 @@ def test_restore_different_revisions(cluster): # Restore to revision 1 (2 parts). create_restore_file(node_another_bucket, revision=revision1, bucket="root") node_another_bucket.query("SYSTEM RESTART DISK s3") - create_table(node_another_bucket, "test", attach=True) + create_table(node_another_bucket, "test", attach=True, db_atomic=db_atomic, uuid=uuid) assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -260,10 +279,14 @@ def test_restore_different_revisions(cluster): assert node_another_bucket.query("SELECT count(*) from system.parts where table = 'test'") == '5\n' -def test_restore_mutations(cluster): +@pytest.mark.parametrize( + "db_atomic", [False, True] +) +def test_restore_mutations(cluster, db_atomic): node = cluster.instances["node"] - create_table(node, "test") + create_table(node, "test", db_atomic=db_atomic) + uuid = get_table_uuid(node, db_atomic, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096, -1))) @@ -281,7 +304,7 @@ def test_restore_mutations(cluster): # Restore to revision before mutation. create_restore_file(node_another_bucket, revision=revision_before_mutation, bucket="root") node_another_bucket.query("SYSTEM RESTART DISK s3") - create_table(node_another_bucket, "test", attach=True) + create_table(node_another_bucket, "test", attach=True, db_atomic=db_atomic, uuid=uuid) assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -315,10 +338,14 @@ def test_restore_mutations(cluster): assert node_another_bucket.query("SELECT sum(counter) FROM s3.test WHERE id > 0 FORMAT Values") == "({})".format(4096) -def test_migrate_to_restorable_schema(cluster): +@pytest.mark.parametrize( + "db_atomic", [False, True] +) +def test_migrate_to_restorable_schema(cluster, db_atomic): node = cluster.instances["node_not_restorable"] - create_table(node, "test") + create_table(node, "test", db_atomic=db_atomic) + uuid = get_table_uuid(node, db_atomic, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) @@ -341,7 +368,7 @@ def test_migrate_to_restorable_schema(cluster): # Restore to revision before mutation. create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data") node_another_bucket.query("SYSTEM RESTART DISK s3") - create_table(node_another_bucket, "test", attach=True) + create_table(node_another_bucket, "test", attach=True, db_atomic=db_atomic, uuid=uuid) assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) @@ -350,10 +377,14 @@ def test_migrate_to_restorable_schema(cluster): @pytest.mark.parametrize( "replicated", [False, True] ) -def test_restore_to_detached(cluster, replicated): +@pytest.mark.parametrize( + "db_atomic", [False, True] +) +def test_restore_to_detached(cluster, replicated, db_atomic): node = cluster.instances["node"] - create_table(node, "test", attach=False, replicated=replicated) + create_table(node, "test", attach=False, replicated=replicated, db_atomic=db_atomic) + uuid = get_table_uuid(node, db_atomic, "test") node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096))) node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1))) @@ -374,7 +405,7 @@ def test_restore_to_detached(cluster, replicated): create_restore_file(node_another_bucket, revision=revision, bucket="root", path="data", detached=True) node_another_bucket.query("SYSTEM RESTART DISK s3") - create_table(node_another_bucket, "test", replicated=replicated) + create_table(node_another_bucket, "test", replicated=replicated, db_atomic=db_atomic, uuid=uuid) assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0) @@ -393,3 +424,35 @@ def test_restore_to_detached(cluster, replicated): assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 5) assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0) assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 5) + + +@pytest.mark.parametrize( + "replicated", [False, True] +) +@pytest.mark.parametrize( + "db_atomic", [False, True] +) +def test_restore_without_detached(cluster, replicated, db_atomic): + node = cluster.instances["node"] + + create_table(node, "test", attach=False, replicated=replicated, db_atomic=db_atomic) + uuid = get_table_uuid(node, db_atomic, "test") + + node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 1))) + + assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(1) + + node.query("ALTER TABLE s3.test FREEZE") + revision = get_revision_counter(node, 1) + + node_another_bucket = cluster.instances["node_another_bucket"] + + create_restore_file(node_another_bucket, revision=revision, bucket="root", path="data", detached=True) + node_another_bucket.query("SYSTEM RESTART DISK s3") + create_table(node_another_bucket, "test", replicated=replicated, db_atomic=db_atomic, uuid=uuid) + + assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0) + + node_another_bucket.query("ALTER TABLE s3.test ATTACH PARTITION '2020-01-03'") + + assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(1)