mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Speedup test_merge_tree_s3_restore using 'SYSTEM RESTART DISK s3'
This commit is contained in:
parent
37c637cc34
commit
64950d0b50
@ -66,11 +66,11 @@ def generate_values(date_str, count, sign=1):
|
||||
return ",".join(["('{}',{},'{}',{})".format(x, y, z, 0) for x, y, z in data])
|
||||
|
||||
|
||||
def create_table(node, table_name, replicated=False):
|
||||
def create_table(node, table_name, attach=False, replicated=False):
|
||||
node.query("CREATE DATABASE IF NOT EXISTS s3 ENGINE = Ordinary")
|
||||
|
||||
create_table_statement = """
|
||||
CREATE TABLE s3.{table_name} {on_cluster} (
|
||||
{create} TABLE s3.{table_name} {on_cluster} (
|
||||
dt Date,
|
||||
id Int64,
|
||||
data String,
|
||||
@ -83,7 +83,8 @@ def create_table(node, table_name, replicated=False):
|
||||
storage_policy='s3',
|
||||
old_parts_lifetime=600,
|
||||
index_granularity=512
|
||||
""".format(table_name=table_name,
|
||||
""".format(create="ATTACH" if attach else "CREATE",
|
||||
table_name=table_name,
|
||||
on_cluster="ON CLUSTER '{}'".format(node.name) if replicated else "",
|
||||
engine="ReplicatedMergeTree('/clickhouse/tables/{cluster}/test', '{replica}')" if replicated else "MergeTree()")
|
||||
|
||||
@ -107,6 +108,7 @@ def drop_shadow_information(node):
|
||||
|
||||
|
||||
def create_restore_file(node, revision=None, bucket=None, path=None, detached=None):
|
||||
node.exec_in_container(['bash', '-c', 'mkdir -p /var/lib/clickhouse/disks/s3/'], user='root')
|
||||
node.exec_in_container(['bash', '-c', 'touch /var/lib/clickhouse/disks/s3/restore'], user='root')
|
||||
|
||||
add_restore_option = 'echo -en "{}={}\n" >> /var/lib/clickhouse/disks/s3/restore'
|
||||
@ -150,17 +152,18 @@ def drop_table(cluster):
|
||||
def test_full_restore(cluster, replicated):
|
||||
node = cluster.instances["node"]
|
||||
|
||||
create_table(node, "test", replicated)
|
||||
create_table(node, "test", attach=False, replicated=replicated)
|
||||
|
||||
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
|
||||
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
|
||||
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096)))
|
||||
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
|
||||
|
||||
node.stop_clickhouse()
|
||||
node.query("DETACH TABLE s3.test")
|
||||
drop_s3_metadata(node)
|
||||
create_restore_file(node)
|
||||
node.start_clickhouse()
|
||||
node.query("SYSTEM RESTART DISK s3")
|
||||
node.query("ATTACH TABLE s3.test")
|
||||
|
||||
assert node.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
|
||||
assert node.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
@ -184,22 +187,18 @@ def test_restore_another_bucket_path(cluster):
|
||||
|
||||
node_another_bucket = cluster.instances["node_another_bucket"]
|
||||
|
||||
create_table(node_another_bucket, "test")
|
||||
|
||||
node_another_bucket.stop_clickhouse()
|
||||
create_restore_file(node_another_bucket, bucket="root")
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
create_table(node_another_bucket, "test", attach=True)
|
||||
|
||||
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
|
||||
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
|
||||
node_another_bucket_path = cluster.instances["node_another_bucket_path"]
|
||||
|
||||
create_table(node_another_bucket_path, "test")
|
||||
|
||||
node_another_bucket_path.stop_clickhouse()
|
||||
create_restore_file(node_another_bucket_path, bucket="root2", path="data")
|
||||
node_another_bucket_path.start_clickhouse()
|
||||
node_another_bucket_path.query("SYSTEM RESTART DISK s3")
|
||||
create_table(node_another_bucket_path, "test", attach=True)
|
||||
|
||||
assert node_another_bucket_path.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
|
||||
assert node_another_bucket_path.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
@ -234,36 +233,30 @@ def test_restore_different_revisions(cluster):
|
||||
|
||||
node_another_bucket = cluster.instances["node_another_bucket"]
|
||||
|
||||
create_table(node_another_bucket, "test")
|
||||
|
||||
# Restore to revision 1 (2 parts).
|
||||
node_another_bucket.stop_clickhouse()
|
||||
drop_s3_metadata(node_another_bucket)
|
||||
purge_s3(cluster, cluster.minio_bucket_2)
|
||||
create_restore_file(node_another_bucket, revision=revision1, bucket="root")
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
create_table(node_another_bucket, "test", attach=True)
|
||||
|
||||
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
|
||||
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
assert node_another_bucket.query("SELECT count(*) from system.parts where table = 'test'") == '2\n'
|
||||
|
||||
# Restore to revision 2 (4 parts).
|
||||
node_another_bucket.stop_clickhouse()
|
||||
drop_s3_metadata(node_another_bucket)
|
||||
purge_s3(cluster, cluster.minio_bucket_2)
|
||||
node_another_bucket.query("DETACH TABLE s3.test")
|
||||
create_restore_file(node_another_bucket, revision=revision2, bucket="root")
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
node_another_bucket.query("ATTACH TABLE s3.test")
|
||||
|
||||
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
|
||||
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
assert node_another_bucket.query("SELECT count(*) from system.parts where table = 'test'") == '4\n'
|
||||
|
||||
# Restore to revision 3 (4 parts + 1 merged).
|
||||
node_another_bucket.stop_clickhouse()
|
||||
drop_s3_metadata(node_another_bucket)
|
||||
purge_s3(cluster, cluster.minio_bucket_2)
|
||||
node_another_bucket.query("DETACH TABLE s3.test")
|
||||
create_restore_file(node_another_bucket, revision=revision3, bucket="root")
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
node_another_bucket.query("ATTACH TABLE s3.test")
|
||||
|
||||
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 4)
|
||||
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
@ -288,25 +281,20 @@ def test_restore_mutations(cluster):
|
||||
|
||||
node_another_bucket = cluster.instances["node_another_bucket"]
|
||||
|
||||
create_table(node_another_bucket, "test")
|
||||
|
||||
# Restore to revision before mutation.
|
||||
node_another_bucket.stop_clickhouse()
|
||||
drop_s3_metadata(node_another_bucket)
|
||||
purge_s3(cluster, cluster.minio_bucket_2)
|
||||
create_restore_file(node_another_bucket, revision=revision_before_mutation, bucket="root")
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
create_table(node_another_bucket, "test", attach=True)
|
||||
|
||||
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
|
||||
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
|
||||
# Restore to revision after mutation.
|
||||
node_another_bucket.stop_clickhouse()
|
||||
drop_s3_metadata(node_another_bucket)
|
||||
purge_s3(cluster, cluster.minio_bucket_2)
|
||||
node_another_bucket.query("DETACH TABLE s3.test")
|
||||
create_restore_file(node_another_bucket, revision=revision_after_mutation, bucket="root")
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
node_another_bucket.query("ATTACH TABLE s3.test")
|
||||
|
||||
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
|
||||
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
@ -315,12 +303,11 @@ def test_restore_mutations(cluster):
|
||||
|
||||
# Restore to revision in the middle of mutation.
|
||||
# Unfinished mutation should be completed after table startup.
|
||||
node_another_bucket.stop_clickhouse()
|
||||
drop_s3_metadata(node_another_bucket)
|
||||
purge_s3(cluster, cluster.minio_bucket_2)
|
||||
node_another_bucket.query("DETACH TABLE s3.test")
|
||||
revision = (revision_before_mutation + revision_after_mutation) // 2
|
||||
create_restore_file(node_another_bucket, revision=revision, bucket="root")
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
node_another_bucket.query("ATTACH TABLE s3.test")
|
||||
|
||||
# Wait for unfinished mutation completion.
|
||||
time.sleep(3)
|
||||
@ -342,7 +329,6 @@ def test_migrate_to_restorable_schema(cluster):
|
||||
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
|
||||
|
||||
replace_config("<send_metadata>false</send_metadata>", "<send_metadata>true</send_metadata>")
|
||||
|
||||
node.restart_clickhouse()
|
||||
|
||||
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096)))
|
||||
@ -355,14 +341,10 @@ def test_migrate_to_restorable_schema(cluster):
|
||||
|
||||
node_another_bucket = cluster.instances["node_another_bucket"]
|
||||
|
||||
create_table(node_another_bucket, "test")
|
||||
|
||||
# Restore to revision before mutation.
|
||||
node_another_bucket.stop_clickhouse()
|
||||
drop_s3_metadata(node_another_bucket)
|
||||
purge_s3(cluster, cluster.minio_bucket_2)
|
||||
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data")
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
create_table(node_another_bucket, "test", attach=True)
|
||||
|
||||
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6)
|
||||
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
@ -374,7 +356,7 @@ def test_migrate_to_restorable_schema(cluster):
|
||||
def test_restore_to_detached(cluster, replicated):
|
||||
node = cluster.instances["node"]
|
||||
|
||||
create_table(node, "test", replicated)
|
||||
create_table(node, "test", attach=False, replicated=replicated)
|
||||
|
||||
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
|
||||
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
|
||||
@ -393,11 +375,9 @@ def test_restore_to_detached(cluster, replicated):
|
||||
|
||||
node_another_bucket = cluster.instances["node_another_bucket"]
|
||||
|
||||
create_table(node_another_bucket, "test", replicated)
|
||||
|
||||
node_another_bucket.stop_clickhouse()
|
||||
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="data", detached=True)
|
||||
node_another_bucket.start_clickhouse()
|
||||
node_another_bucket.query("SYSTEM RESTART DISK s3")
|
||||
create_table(node_another_bucket, "test", replicated=replicated)
|
||||
|
||||
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(0)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user