ClickHouse/tests/integration/test_s3_zero_copy_replication/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

623 lines
21 KiB
Python
Raw Normal View History

import datetime
import logging
import time
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node1",
main_configs=["configs/config.d/s3.xml"],
macros={"replica": "1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"node2",
main_configs=["configs/config.d/s3.xml"],
macros={"replica": "2"},
with_minio=True,
with_zookeeper=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def get_large_objects_count(cluster, size=100, folder="data"):
minio = cluster.minio_client
counter = 0
for obj in minio.list_objects(
cluster.minio_bucket, "{}/".format(folder), recursive=True
):
if obj.size is not None and obj.size >= size:
counter = counter + 1
return counter
def check_objects_exisis(cluster, object_list, folder="data"):
minio = cluster.minio_client
for obj in object_list:
if obj:
minio.stat_object(cluster.minio_bucket, "{}/{}".format(folder, obj))
def check_objects_not_exisis(cluster, object_list, folder="data"):
minio = cluster.minio_client
for obj in object_list:
if obj:
try:
minio.stat_object(cluster.minio_bucket, "{}/{}".format(folder, obj))
except Exception as error:
assert "NoSuchKey" in str(error)
else:
assert False, "Object {} should not be exists".format(obj)
def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
while timeout > 0:
2021-05-17 13:01:08 +00:00
if get_large_objects_count(cluster, size=size) == expected:
return
timeout -= 1
time.sleep(1)
2021-05-17 13:01:08 +00:00
assert get_large_objects_count(cluster, size=size) == expected
2021-12-13 13:34:04 +00:00
def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
deadline = time.monotonic() + timeout
num_parts = 0
while time.monotonic() < deadline:
num_parts_str = node.query(
"select count() from system.parts where table = '{}' and active".format(
table_name
)
)
2021-12-13 13:34:04 +00:00
num_parts = int(num_parts_str.strip())
if num_parts == num_expected_parts:
return
time.sleep(0.2)
assert num_parts == num_expected_parts
# Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning
@pytest.mark.order(0)
@pytest.mark.parametrize("policy", ["s3"])
def test_s3_zero_copy_replication(cluster, policy):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"""
CREATE TABLE s3_test ON CLUSTER test_cluster (id UInt32, value String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}')
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
"{replica}", policy
)
)
node1.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA s3_test", timeout=30)
assert (
node1.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data')"
)
assert (
node2.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data')"
)
2021-09-20 13:09:20 +00:00
# Based on version 21.x - should be only 1 file with size 100+ (checksums.txt), used by both nodes
assert get_large_objects_count(cluster) == 1
node2.query("INSERT INTO s3_test VALUES (2,'data'),(3,'data')")
2022-06-30 20:51:27 +00:00
node1.query("SYSTEM SYNC REPLICA s3_test", timeout=30)
assert (
node2.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
)
assert (
node1.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
)
2021-08-02 15:42:16 +00:00
# Based on version 21.x - two parts
wait_for_large_objects_count(cluster, 2)
node1.query("OPTIMIZE TABLE s3_test FINAL")
2021-08-02 15:42:16 +00:00
# Based on version 21.x - after merge, two old parts and one merged
wait_for_large_objects_count(cluster, 3)
2021-08-02 15:42:16 +00:00
# Based on version 21.x - after cleanup - only one merged part
wait_for_large_objects_count(cluster, 1, timeout=60)
node1.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node2.query("DROP TABLE IF EXISTS s3_test NO DELAY")
2022-07-15 10:44:37 +00:00
@pytest.mark.skip(reason="Test is flaky (and never was stable)")
def test_s3_zero_copy_on_hybrid_storage(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"""
CREATE TABLE hybrid_test ON CLUSTER test_cluster (id UInt32, value String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/hybrid_test', '{}')
ORDER BY id
SETTINGS storage_policy='hybrid'
""".format(
"{replica}"
)
)
node1.query("INSERT INTO hybrid_test VALUES (0,'data'),(1,'data')")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA hybrid_test", timeout=30)
assert (
node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values")
== "(0,'data'),(1,'data')"
)
assert (
node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values")
== "(0,'data'),(1,'data')"
)
assert (
node1.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','default')"
)
assert (
node2.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','default')"
)
node1.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
assert (
node1.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','s31')"
)
assert (
node2.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','default')"
)
# Total objects in S3
s3_objects = get_large_objects_count(cluster, size=0)
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
assert (
node1.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','s31')"
)
assert (
node2.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','s31')"
)
# Check that after moving partition on node2 no new obects on s3
wait_for_large_objects_count(cluster, s3_objects, size=0)
assert (
node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values")
== "(0,'data'),(1,'data')"
)
assert (
node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values")
== "(0,'data'),(1,'data')"
)
node1.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
node2.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
def insert_data_time(node, table, number_of_mb, time, start=0):
values = ",".join(
f"({x},{time})"
for x in range(start, int((1024 * 1024 * number_of_mb) / 8) + start + 1)
)
node.query(f"INSERT INTO {table} VALUES {values}")
def insert_large_data(node, table):
tm = time.mktime((datetime.date.today() - datetime.timedelta(days=7)).timetuple())
insert_data_time(node, table, 1, tm, 0)
tm = time.mktime((datetime.date.today() - datetime.timedelta(days=3)).timetuple())
insert_data_time(node, table, 1, tm, 1024 * 1024)
tm = time.mktime(datetime.date.today().timetuple())
insert_data_time(node, table, 10, tm, 1024 * 1024 * 2)
@pytest.mark.parametrize(
("storage_policy", "large_data", "iterations"),
[
("tiered", False, 10),
("tiered_copy", False, 10),
("tiered", True, 3),
("tiered_copy", True, 3),
],
)
def test_s3_zero_copy_with_ttl_move(cluster, storage_policy, large_data, iterations):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
for i in range(iterations):
node1.query(
"""
CREATE TABLE ttl_move_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_move_test', '{}')
ORDER BY d
TTL d1 + INTERVAL 2 DAY TO VOLUME 'external'
SETTINGS storage_policy='{}'
""".format(
"{replica}", storage_policy
)
)
if large_data:
insert_large_data(node1, "ttl_move_test")
else:
node1.query("INSERT INTO ttl_move_test VALUES (10, now() - INTERVAL 3 DAY)")
node1.query("INSERT INTO ttl_move_test VALUES (11, now() - INTERVAL 1 DAY)")
node1.query("OPTIMIZE TABLE ttl_move_test FINAL")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA ttl_move_test", timeout=30)
if large_data:
assert (
node1.query("SELECT count() FROM ttl_move_test FORMAT Values")
== "(1572867)"
)
assert (
node2.query("SELECT count() FROM ttl_move_test FORMAT Values")
== "(1572867)"
)
else:
assert (
node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
)
assert (
node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
)
assert (
node1.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values")
== "(10),(11)"
)
assert (
node2.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values")
== "(10),(11)"
)
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
@pytest.mark.parametrize(
("large_data", "iterations"),
[
(False, 10),
(True, 3),
],
)
def test_s3_zero_copy_with_ttl_delete(cluster, large_data, iterations):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
for i in range(iterations):
node1.query(
"""
CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_delete_test', '{}')
ORDER BY d
TTL d1 + INTERVAL 2 DAY
SETTINGS storage_policy='tiered'
""".format(
"{replica}"
)
)
if large_data:
insert_large_data(node1, "ttl_delete_test")
else:
node1.query(
"INSERT INTO ttl_delete_test VALUES (10, now() - INTERVAL 3 DAY)"
)
node1.query(
"INSERT INTO ttl_delete_test VALUES (11, now() - INTERVAL 1 DAY)"
)
node1.query("OPTIMIZE TABLE ttl_delete_test FINAL")
2022-03-24 17:25:16 +00:00
2022-06-30 20:51:27 +00:00
node1.query("SYSTEM SYNC REPLICA ttl_delete_test", timeout=30)
node2.query("SYSTEM SYNC REPLICA ttl_delete_test", timeout=30)
if large_data:
assert (
node1.query("SELECT count() FROM ttl_delete_test FORMAT Values")
== "(1310721)"
)
assert (
node2.query("SELECT count() FROM ttl_delete_test FORMAT Values")
== "(1310721)"
)
else:
assert (
node1.query("SELECT count() FROM ttl_delete_test FORMAT Values")
== "(1)"
)
assert (
node2.query("SELECT count() FROM ttl_delete_test FORMAT Values")
== "(1)"
)
assert (
node1.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values")
== "(11)"
)
assert (
node2.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values")
== "(11)"
)
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
2021-12-01 13:12:40 +00:00
def wait_mutations(node, table, seconds):
time.sleep(1)
2021-12-01 13:12:40 +00:00
while seconds > 0:
seconds -= 1
mutations = node.query(
f"SELECT count() FROM system.mutations WHERE table='{table}' AND is_done=0"
)
if mutations == "0\n":
2021-12-01 13:12:40 +00:00
return
time.sleep(1)
mutations = node.query(
f"SELECT count() FROM system.mutations WHERE table='{table}' AND is_done=0"
)
assert mutations == "0\n"
2021-12-01 13:12:40 +00:00
2022-06-08 12:09:59 +00:00
def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS unfreeze_test NO DELAY")
node2.query("DROP TABLE IF EXISTS unfreeze_test NO DELAY")
node1.query(
"""
CREATE TABLE unfreeze_test ON CLUSTER test_cluster (d UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/unfreeze_test', '{}')
ORDER BY d
SETTINGS storage_policy='s3'
""".format(
"{replica}"
)
)
node1.query("INSERT INTO unfreeze_test VALUES (0)")
2021-12-01 13:12:40 +00:00
node1.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup1'")
node2.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup2'")
wait_mutations(node1, "unfreeze_test", 10)
wait_mutations(node2, "unfreeze_test", 10)
2021-12-01 13:12:40 +00:00
objects01 = node1.get_backuped_s3_objects("s31", "freeze_backup1")
objects02 = node2.get_backuped_s3_objects("s31", "freeze_backup2")
assert objects01 == objects02
check_objects_exisis(cluster, objects01)
node1.query("TRUNCATE TABLE unfreeze_test")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA unfreeze_test", timeout=30)
2021-12-01 13:12:40 +00:00
objects11 = node1.get_backuped_s3_objects("s31", "freeze_backup1")
objects12 = node2.get_backuped_s3_objects("s31", "freeze_backup2")
assert objects01 == objects11
assert objects01 == objects12
check_objects_exisis(cluster, objects11)
2022-06-08 12:09:59 +00:00
node1.query(f"{unfreeze_query_template} 'freeze_backup1'")
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "unfreeze_test", 10)
check_objects_exisis(cluster, objects12)
2022-06-08 12:09:59 +00:00
node2.query(f"{unfreeze_query_template} 'freeze_backup2'")
2021-12-01 13:12:40 +00:00
wait_mutations(node2, "unfreeze_test", 10)
check_objects_not_exisis(cluster, objects12)
2021-12-01 13:12:40 +00:00
node1.query("DROP TABLE IF EXISTS unfreeze_test NO DELAY")
node2.query("DROP TABLE IF EXISTS unfreeze_test NO DELAY")
2022-06-08 12:09:59 +00:00
def test_s3_zero_copy_unfreeze_alter(cluster):
s3_zero_copy_unfreeze_base(cluster, "ALTER TABLE unfreeze_test UNFREEZE WITH NAME")
def test_s3_zero_copy_unfreeze_system(cluster):
s3_zero_copy_unfreeze_base(cluster, "SYSTEM UNFREEZE WITH NAME")
def s3_zero_copy_drop_detached(cluster, unfreeze_query_template):
2021-12-01 13:12:40 +00:00
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS drop_detached_test NO DELAY")
node2.query("DROP TABLE IF EXISTS drop_detached_test NO DELAY")
node1.query(
"""
CREATE TABLE drop_detached_test ON CLUSTER test_cluster (d UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/drop_detached_test', '{}')
ORDER BY d PARTITION BY d
SETTINGS storage_policy='s3'
""".format(
"{replica}"
)
2021-12-01 13:12:40 +00:00
)
node1.query("INSERT INTO drop_detached_test VALUES (0)")
node1.query("ALTER TABLE drop_detached_test FREEZE WITH NAME 'detach_backup1'")
node1.query("INSERT INTO drop_detached_test VALUES (1)")
node1.query("ALTER TABLE drop_detached_test FREEZE WITH NAME 'detach_backup2'")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
objects1 = node1.get_backuped_s3_objects("s31", "detach_backup1")
objects2 = node1.get_backuped_s3_objects("s31", "detach_backup2")
objects_diff = list(set(objects2) - set(objects1))
2022-06-08 12:09:59 +00:00
node1.query(f"{unfreeze_query_template} 'detach_backup2'")
node1.query(f"{unfreeze_query_template} 'detach_backup1'")
2021-12-01 13:12:40 +00:00
node1.query("ALTER TABLE drop_detached_test DETACH PARTITION '0'")
node1.query("ALTER TABLE drop_detached_test DETACH PARTITION '1'")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2022-02-10 16:50:21 +00:00
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
check_objects_exisis(cluster, objects1)
check_objects_exisis(cluster, objects2)
node2.query(
"ALTER TABLE drop_detached_test DROP DETACHED PARTITION '1'",
settings={"allow_drop_detached": 1},
)
2022-06-30 20:51:27 +00:00
node1.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
check_objects_exisis(cluster, objects1)
check_objects_exisis(cluster, objects2)
node1.query(
"ALTER TABLE drop_detached_test DROP DETACHED PARTITION '1'",
settings={"allow_drop_detached": 1},
)
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
check_objects_exisis(cluster, objects1)
check_objects_not_exisis(cluster, objects_diff)
node1.query(
"ALTER TABLE drop_detached_test DROP DETACHED PARTITION '0'",
settings={"allow_drop_detached": 1},
)
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
check_objects_exisis(cluster, objects1)
node2.query(
"ALTER TABLE drop_detached_test DROP DETACHED PARTITION '0'",
settings={"allow_drop_detached": 1},
)
2022-06-30 20:51:27 +00:00
node1.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
check_objects_not_exisis(cluster, objects1)
2021-12-21 14:27:54 +00:00
2022-06-08 12:09:59 +00:00
def test_s3_zero_copy_drop_detached_alter(cluster):
s3_zero_copy_drop_detached(
cluster, "ALTER TABLE drop_detached_test UNFREEZE WITH NAME"
)
def test_s3_zero_copy_drop_detached_system(cluster):
s3_zero_copy_drop_detached(cluster, "SYSTEM UNFREEZE WITH NAME")
2021-12-13 13:34:04 +00:00
def test_s3_zero_copy_concurrent_merge(cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS concurrent_merge NO DELAY")
node2.query("DROP TABLE IF EXISTS concurrent_merge NO DELAY")
for node in (node1, node2):
node.query(
"""
2021-12-13 13:34:04 +00:00
CREATE TABLE concurrent_merge (id UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/concurrent_merge', '{replica}')
ORDER BY id
SETTINGS index_granularity=2, storage_policy='s3', remote_fs_execute_merges_on_single_replica_time_threshold=1
"""
)
2021-12-13 13:34:04 +00:00
node1.query("system stop merges")
node2.query("system stop merges")
# This will generate two parts with 20 granules each
node1.query("insert into concurrent_merge select number from numbers(40)")
node1.query("insert into concurrent_merge select number + 1 from numbers(40)")
wait_for_active_parts(node2, 2, "concurrent_merge")
2021-12-13 13:34:04 +00:00
# Merge will materialize default column, it should sleep every granule and take 20 * 2 * 0.1 = 4 sec.
node1.query("alter table concurrent_merge add column x UInt32 default sleep(0.1)")
node1.query("system start merges")
node2.query("system start merges")
# Now, the merge should start.
# Because of remote_fs_execute_merges_on_single_replica_time_threshold=1,
# only one replica will start merge instantly.
# The other replica should wait for 1 sec and also start it.
# That should probably cause a data race at s3 storage.
# For now, it does not happen (every blob has a random name, and we just have a duplicating data)
node1.query("optimize table concurrent_merge final")
wait_for_active_parts(node1, 1, "concurrent_merge")
wait_for_active_parts(node2, 1, "concurrent_merge")
2021-12-13 13:34:04 +00:00
for node in (node1, node2):
assert node.query("select sum(id) from concurrent_merge").strip() == "1600"