ClickHouse/tests/integration/test_s3_zero_copy_replication/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

760 lines
24 KiB
Python
Raw Normal View History

import datetime
import logging
import time
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
2022-09-28 12:38:11 +00:00
cluster = ClickHouseCluster(__file__)
@pytest.fixture(scope="module")
2022-09-28 12:38:11 +00:00
def started_cluster():
try:
cluster.add_instance(
"node1",
main_configs=["configs/config.d/s3.xml"],
macros={"replica": "1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"node2",
main_configs=["configs/config.d/s3.xml"],
macros={"replica": "2"},
with_minio=True,
with_zookeeper=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def get_large_objects_count(cluster, size=100, folder="data"):
minio = cluster.minio_client
counter = 0
for obj in minio.list_objects(
cluster.minio_bucket, "{}/".format(folder), recursive=True
):
if obj.size is not None and obj.size >= size:
counter = counter + 1
return counter
2023-07-07 11:05:42 +00:00
def check_objects_exist(cluster, object_list, folder="data"):
minio = cluster.minio_client
for obj in object_list:
if obj:
minio.stat_object(cluster.minio_bucket, "{}/{}".format(folder, obj))
def check_objects_not_exisis(cluster, object_list, folder="data"):
minio = cluster.minio_client
for obj in object_list:
if obj:
try:
minio.stat_object(cluster.minio_bucket, "{}/{}".format(folder, obj))
except Exception as error:
assert "NoSuchKey" in str(error)
else:
assert False, "Object {} should not be exists".format(obj)
def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
while timeout > 0:
2021-05-17 13:01:08 +00:00
if get_large_objects_count(cluster, size=size) == expected:
return
timeout -= 1
time.sleep(1)
2021-05-17 13:01:08 +00:00
assert get_large_objects_count(cluster, size=size) == expected
2021-12-13 13:34:04 +00:00
def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
deadline = time.monotonic() + timeout
num_parts = 0
while time.monotonic() < deadline:
num_parts_str = node.query(
"select count() from system.parts where table = '{}' and active".format(
table_name
)
)
2021-12-13 13:34:04 +00:00
num_parts = int(num_parts_str.strip())
if num_parts == num_expected_parts:
return
time.sleep(0.2)
assert num_parts == num_expected_parts
# Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning
@pytest.mark.order(0)
@pytest.mark.parametrize("policy", ["s3"])
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_replication(started_cluster, policy):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"""
CREATE TABLE s3_test ON CLUSTER test_cluster (id UInt32, value String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}')
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
"{replica}", policy
)
)
node1.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA s3_test", timeout=30)
assert (
node1.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data')"
)
assert (
node2.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data')"
)
2021-09-20 13:09:20 +00:00
# Based on version 21.x - should be only 1 file with size 100+ (checksums.txt), used by both nodes
assert get_large_objects_count(cluster) == 1
node2.query("INSERT INTO s3_test VALUES (2,'data'),(3,'data')")
2022-06-30 20:51:27 +00:00
node1.query("SYSTEM SYNC REPLICA s3_test", timeout=30)
assert (
node2.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
)
assert (
node1.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
)
2021-08-02 15:42:16 +00:00
# Based on version 21.x - two parts
wait_for_large_objects_count(cluster, 2)
node1.query("OPTIMIZE TABLE s3_test FINAL")
2021-08-02 15:42:16 +00:00
# Based on version 21.x - after merge, two old parts and one merged
wait_for_large_objects_count(cluster, 3)
2021-08-02 15:42:16 +00:00
# Based on version 21.x - after cleanup - only one merged part
wait_for_large_objects_count(cluster, 1, timeout=60)
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS s3_test SYNC")
node2.query("DROP TABLE IF EXISTS s3_test SYNC")
2022-07-15 10:44:37 +00:00
@pytest.mark.skip(reason="Test is flaky (and never was stable)")
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_on_hybrid_storage(started_cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"""
CREATE TABLE hybrid_test ON CLUSTER test_cluster (id UInt32, value String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/hybrid_test', '{}')
ORDER BY id
SETTINGS storage_policy='hybrid',temporary_directories_lifetime=1
""".format(
"{replica}"
)
)
node1.query("INSERT INTO hybrid_test VALUES (0,'data'),(1,'data')")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA hybrid_test", timeout=30)
assert (
node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values")
== "(0,'data'),(1,'data')"
)
assert (
node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values")
== "(0,'data'),(1,'data')"
)
assert (
node1.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','default')"
)
assert (
node2.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','default')"
)
node1.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
assert (
node1.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','s31')"
)
assert (
node2.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','default')"
)
# Total objects in S3
s3_objects = get_large_objects_count(cluster, size=0)
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
assert (
node1.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','s31')"
)
assert (
node2.query(
"SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values"
)
== "('all','s31')"
)
# Check that after moving partition on node2 no new obects on s3
wait_for_large_objects_count(cluster, s3_objects, size=0)
assert (
node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values")
== "(0,'data'),(1,'data')"
)
assert (
node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values")
== "(0,'data'),(1,'data')"
)
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS hybrid_test SYNC")
node2.query("DROP TABLE IF EXISTS hybrid_test SYNC")
def insert_data_time(node, table, number_of_mb, time, start=0):
values = ",".join(
f"({x},{time})"
for x in range(start, int((1024 * 1024 * number_of_mb) / 8) + start + 1)
)
node.query(f"INSERT INTO {table} VALUES {values}")
def insert_large_data(node, table):
tm = time.mktime((datetime.date.today() - datetime.timedelta(days=7)).timetuple())
insert_data_time(node, table, 1, tm, 0)
tm = time.mktime((datetime.date.today() - datetime.timedelta(days=3)).timetuple())
insert_data_time(node, table, 1, tm, 1024 * 1024)
tm = time.mktime(datetime.date.today().timetuple())
insert_data_time(node, table, 10, tm, 1024 * 1024 * 2)
@pytest.mark.parametrize(
("storage_policy", "large_data", "iterations"),
[
("tiered", False, 10),
("tiered_copy", False, 10),
("tiered", True, 3),
("tiered_copy", True, 3),
],
)
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_with_ttl_move(
started_cluster, storage_policy, large_data, iterations
):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS ttl_move_test SYNC")
node2.query("DROP TABLE IF EXISTS ttl_move_test SYNC")
for i in range(iterations):
node1.query(
"""
CREATE TABLE ttl_move_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_move_test', '{}')
ORDER BY d
TTL d1 + INTERVAL 2 DAY TO VOLUME 'external'
SETTINGS storage_policy='{}'
""".format(
"{replica}", storage_policy
)
)
if large_data:
insert_large_data(node1, "ttl_move_test")
else:
node1.query("INSERT INTO ttl_move_test VALUES (10, now() - INTERVAL 3 DAY)")
node1.query("INSERT INTO ttl_move_test VALUES (11, now() - INTERVAL 1 DAY)")
node1.query("OPTIMIZE TABLE ttl_move_test FINAL")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA ttl_move_test", timeout=30)
if large_data:
assert (
node1.query("SELECT count() FROM ttl_move_test FORMAT Values")
== "(1572867)"
)
assert (
node2.query("SELECT count() FROM ttl_move_test FORMAT Values")
== "(1572867)"
)
else:
assert (
node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
)
assert (
node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
)
assert (
node1.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values")
== "(10),(11)"
)
assert (
node2.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values")
== "(10),(11)"
)
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS ttl_move_test SYNC")
node2.query("DROP TABLE IF EXISTS ttl_move_test SYNC")
@pytest.mark.parametrize(
("large_data", "iterations"),
[
(False, 10),
(True, 3),
],
)
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_with_ttl_delete(started_cluster, large_data, iterations):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS ttl_delete_test SYNC")
node2.query("DROP TABLE IF EXISTS ttl_delete_test SYNC")
for i in range(iterations):
node1.query(
"""
CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_delete_test', '{}')
ORDER BY d
TTL d1 + INTERVAL 2 DAY
SETTINGS storage_policy='tiered'
""".format(
"{replica}"
)
)
if large_data:
insert_large_data(node1, "ttl_delete_test")
else:
node1.query(
"INSERT INTO ttl_delete_test VALUES (10, now() - INTERVAL 3 DAY)"
)
node1.query(
"INSERT INTO ttl_delete_test VALUES (11, now() - INTERVAL 1 DAY)"
)
node1.query("OPTIMIZE TABLE ttl_delete_test FINAL")
2022-03-24 17:25:16 +00:00
2022-06-30 20:51:27 +00:00
node1.query("SYSTEM SYNC REPLICA ttl_delete_test", timeout=30)
node2.query("SYSTEM SYNC REPLICA ttl_delete_test", timeout=30)
if large_data:
assert (
node1.query("SELECT count() FROM ttl_delete_test FORMAT Values")
== "(1310721)"
)
assert (
node2.query("SELECT count() FROM ttl_delete_test FORMAT Values")
== "(1310721)"
)
else:
assert (
node1.query("SELECT count() FROM ttl_delete_test FORMAT Values")
== "(1)"
)
assert (
node2.query("SELECT count() FROM ttl_delete_test FORMAT Values")
== "(1)"
)
assert (
node1.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values")
== "(11)"
)
assert (
node2.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values")
== "(11)"
)
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS ttl_delete_test SYNC")
node2.query("DROP TABLE IF EXISTS ttl_delete_test SYNC")
2021-12-01 13:12:40 +00:00
def wait_mutations(node, table, seconds):
time.sleep(1)
2021-12-01 13:12:40 +00:00
while seconds > 0:
seconds -= 1
mutations = node.query(
f"SELECT count() FROM system.mutations WHERE table='{table}' AND is_done=0"
)
if mutations == "0\n":
2021-12-01 13:12:40 +00:00
return
time.sleep(1)
mutations = node.query(
f"SELECT count() FROM system.mutations WHERE table='{table}' AND is_done=0"
)
assert mutations == "0\n"
2021-12-01 13:12:40 +00:00
2022-09-28 12:38:11 +00:00
2022-09-28 11:09:48 +00:00
def wait_for_clean_old_parts(node, table, seconds):
time.sleep(1)
while seconds > 0:
seconds -= 1
parts = node.query(
f"SELECT count() FROM system.parts WHERE table='{table}' AND active=0"
)
if parts == "0\n":
return
time.sleep(1)
parts = node.query(
f"SELECT count() FROM system.parts WHERE table='{table}' AND active=0"
)
assert parts == "0\n"
2021-12-01 13:12:40 +00:00
2022-09-28 12:38:11 +00:00
2022-06-08 12:09:59 +00:00
def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS unfreeze_test SYNC")
node2.query("DROP TABLE IF EXISTS unfreeze_test SYNC")
node1.query(
"""
CREATE TABLE unfreeze_test ON CLUSTER test_cluster (d UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/unfreeze_test', '{}')
ORDER BY d
SETTINGS storage_policy='s3'
""".format(
"{replica}"
)
)
node1.query("INSERT INTO unfreeze_test VALUES (0)")
2022-09-28 11:09:48 +00:00
wait_for_active_parts(node2, 1, "unfreeze_test")
2021-12-01 13:12:40 +00:00
node1.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup1'")
node2.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup2'")
wait_mutations(node1, "unfreeze_test", 10)
wait_mutations(node2, "unfreeze_test", 10)
2021-12-01 13:12:40 +00:00
objects01 = node1.get_backuped_s3_objects("s31", "freeze_backup1")
objects02 = node2.get_backuped_s3_objects("s31", "freeze_backup2")
assert objects01 == objects02
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objects01)
node1.query("TRUNCATE TABLE unfreeze_test")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA unfreeze_test", timeout=30)
2021-12-01 13:12:40 +00:00
objects11 = node1.get_backuped_s3_objects("s31", "freeze_backup1")
objects12 = node2.get_backuped_s3_objects("s31", "freeze_backup2")
assert objects01 == objects11
assert objects01 == objects12
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objects11)
2022-06-08 12:09:59 +00:00
node1.query(f"{unfreeze_query_template} 'freeze_backup1'")
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "unfreeze_test", 10)
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objects12)
2022-06-08 12:09:59 +00:00
node2.query(f"{unfreeze_query_template} 'freeze_backup2'")
2021-12-01 13:12:40 +00:00
wait_mutations(node2, "unfreeze_test", 10)
check_objects_not_exisis(cluster, objects12)
2021-12-01 13:12:40 +00:00
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS unfreeze_test SYNC")
node2.query("DROP TABLE IF EXISTS unfreeze_test SYNC")
2021-12-01 13:12:40 +00:00
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_unfreeze_alter(started_cluster):
2022-06-08 12:09:59 +00:00
s3_zero_copy_unfreeze_base(cluster, "ALTER TABLE unfreeze_test UNFREEZE WITH NAME")
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_unfreeze_system(started_cluster):
2022-06-08 12:09:59 +00:00
s3_zero_copy_unfreeze_base(cluster, "SYSTEM UNFREEZE WITH NAME")
def s3_zero_copy_drop_detached(cluster, unfreeze_query_template):
2021-12-01 13:12:40 +00:00
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS drop_detached_test SYNC")
node2.query("DROP TABLE IF EXISTS drop_detached_test SYNC")
2021-12-01 13:12:40 +00:00
node1.query(
"""
CREATE TABLE drop_detached_test ON CLUSTER test_cluster (d UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/drop_detached_test', '{}')
ORDER BY d PARTITION BY d
SETTINGS storage_policy='s3'
""".format(
"{replica}"
)
2021-12-01 13:12:40 +00:00
)
node1.query("INSERT INTO drop_detached_test VALUES (0)")
node1.query("ALTER TABLE drop_detached_test FREEZE WITH NAME 'detach_backup1'")
node1.query("INSERT INTO drop_detached_test VALUES (1)")
node1.query("ALTER TABLE drop_detached_test FREEZE WITH NAME 'detach_backup2'")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
objects1 = node1.get_backuped_s3_objects("s31", "detach_backup1")
objects2 = node1.get_backuped_s3_objects("s31", "detach_backup2")
objects_diff = list(set(objects2) - set(objects1))
2022-06-08 12:09:59 +00:00
node1.query(f"{unfreeze_query_template} 'detach_backup2'")
node1.query(f"{unfreeze_query_template} 'detach_backup1'")
2021-12-01 13:12:40 +00:00
node1.query("ALTER TABLE drop_detached_test DETACH PARTITION '0'")
node1.query("ALTER TABLE drop_detached_test DETACH PARTITION '1'")
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2022-02-10 16:50:21 +00:00
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objects1)
check_objects_exist(cluster, objects2)
2021-12-01 13:12:40 +00:00
node2.query(
"ALTER TABLE drop_detached_test DROP DETACHED PARTITION '1'",
settings={"allow_drop_detached": 1},
)
2022-06-30 20:51:27 +00:00
node1.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objects1)
check_objects_exist(cluster, objects2)
2021-12-01 13:12:40 +00:00
node1.query(
"ALTER TABLE drop_detached_test DROP DETACHED PARTITION '1'",
settings={"allow_drop_detached": 1},
)
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objects1)
2021-12-01 13:12:40 +00:00
check_objects_not_exisis(cluster, objects_diff)
node1.query(
"ALTER TABLE drop_detached_test DROP DETACHED PARTITION '0'",
settings={"allow_drop_detached": 1},
)
2022-06-30 20:51:27 +00:00
node2.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objects1)
2021-12-01 13:12:40 +00:00
node2.query(
"ALTER TABLE drop_detached_test DROP DETACHED PARTITION '0'",
settings={"allow_drop_detached": 1},
)
2022-06-30 20:51:27 +00:00
node1.query("SYSTEM SYNC REPLICA drop_detached_test", timeout=30)
2021-12-01 13:12:40 +00:00
wait_mutations(node1, "drop_detached_test", 10)
wait_mutations(node2, "drop_detached_test", 10)
check_objects_not_exisis(cluster, objects1)
2021-12-21 14:27:54 +00:00
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_drop_detached_alter(started_cluster):
2022-06-08 12:09:59 +00:00
s3_zero_copy_drop_detached(
cluster, "ALTER TABLE drop_detached_test UNFREEZE WITH NAME"
)
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_drop_detached_system(started_cluster):
2022-06-08 12:09:59 +00:00
s3_zero_copy_drop_detached(cluster, "SYSTEM UNFREEZE WITH NAME")
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_concurrent_merge(started_cluster):
2021-12-13 13:34:04 +00:00
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS concurrent_merge SYNC")
node2.query("DROP TABLE IF EXISTS concurrent_merge SYNC")
2021-12-13 13:34:04 +00:00
for node in (node1, node2):
node.query(
"""
2021-12-13 13:34:04 +00:00
CREATE TABLE concurrent_merge (id UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/concurrent_merge', '{replica}')
ORDER BY id
SETTINGS index_granularity=2, storage_policy='s3', remote_fs_execute_merges_on_single_replica_time_threshold=1
"""
)
2021-12-13 13:34:04 +00:00
node1.query("system stop merges")
node2.query("system stop merges")
# This will generate two parts with 20 granules each
node1.query("insert into concurrent_merge select number from numbers(40)")
node1.query("insert into concurrent_merge select number + 1 from numbers(40)")
wait_for_active_parts(node2, 2, "concurrent_merge")
2021-12-13 13:34:04 +00:00
# Merge will materialize default column, it should sleep every granule and take 20 * 2 * 0.1 = 4 sec.
node1.query("alter table concurrent_merge add column x UInt32 default sleep(0.1)")
node1.query("system start merges")
node2.query("system start merges")
# Now, the merge should start.
# Because of remote_fs_execute_merges_on_single_replica_time_threshold=1,
# only one replica will start merge instantly.
# The other replica should wait for 1 sec and also start it.
# That should probably cause a data race at s3 storage.
# For now, it does not happen (every blob has a random name, and we just have a duplicating data)
node1.query("optimize table concurrent_merge final")
wait_for_active_parts(node1, 1, "concurrent_merge")
wait_for_active_parts(node2, 1, "concurrent_merge")
2021-12-13 13:34:04 +00:00
for node in (node1, node2):
assert node.query("select sum(id) from concurrent_merge").strip() == "1600"
2022-09-28 11:09:48 +00:00
2022-09-28 12:38:11 +00:00
def test_s3_zero_copy_keeps_data_after_mutation(started_cluster):
2022-09-28 11:09:48 +00:00
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
2023-05-03 18:06:46 +00:00
node1.query("DROP TABLE IF EXISTS zero_copy_mutation SYNC")
node2.query("DROP TABLE IF EXISTS zero_copy_mutation SYNC")
2022-09-28 11:09:48 +00:00
node1.query(
2022-09-28 12:38:11 +00:00
"""
2022-09-28 11:09:48 +00:00
CREATE TABLE zero_copy_mutation (id UInt64, value1 String, value2 String, value3 String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/zero_copy_mutation', '{replica}')
ORDER BY id
PARTITION BY (id % 4)
SETTINGS storage_policy='s3',
old_parts_lifetime=1000
"""
2022-09-28 12:38:11 +00:00
)
2022-09-28 11:09:48 +00:00
node2.query(
2022-09-28 12:38:11 +00:00
"""
2022-09-28 11:09:48 +00:00
CREATE TABLE zero_copy_mutation (id UInt64, value1 String, value2 String, value3 String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/zero_copy_mutation', '{replica}')
ORDER BY id
PARTITION BY (id % 4)
SETTINGS storage_policy='s3',
old_parts_lifetime=1000
"""
2022-09-28 12:38:11 +00:00
)
2022-09-28 11:09:48 +00:00
node1.query(
2022-09-28 12:38:11 +00:00
"""
2022-09-28 11:09:48 +00:00
INSERT INTO zero_copy_mutation
SELECT * FROM generateRandom('id UInt64, value1 String, value2 String, value3 String') limit 1000000
"""
2022-09-28 12:38:11 +00:00
)
2022-09-28 11:09:48 +00:00
wait_for_active_parts(node2, 4, "zero_copy_mutation")
objects1 = node1.get_table_objects("zero_copy_mutation")
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objects1)
2022-09-28 11:09:48 +00:00
node1.query(
2022-09-28 12:38:11 +00:00
"""
2022-09-28 11:09:48 +00:00
ALTER TABLE zero_copy_mutation
ADD COLUMN valueX String MATERIALIZED value1
"""
)
node1.query(
2022-09-28 12:38:11 +00:00
"""
2022-09-28 11:09:48 +00:00
ALTER TABLE zero_copy_mutation
MATERIALIZE COLUMN valueX
"""
)
wait_mutations(node1, "zero_copy_mutation", 10)
wait_mutations(node2, "zero_copy_mutation", 10)
2022-09-28 12:38:11 +00:00
# If bug present at least one node has metadata with incorrect ref_count values.
# But it may be any node depends on mutation execution order.
# We can try to find one, but this required knowledge about internal metadata structure.
# It can be change in future, so we do not find this node here.
# And with the bug test may be success sometimes.
2022-09-28 11:09:48 +00:00
nodeX = node1
nodeY = node2
objectsY = nodeY.get_table_objects("zero_copy_mutation")
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objectsY)
2022-09-28 11:09:48 +00:00
nodeX.query(
"""
ALTER TABLE zero_copy_mutation
DETACH PARTITION '0'
"""
)
nodeX.query(
"""
ALTER TABLE zero_copy_mutation
ATTACH PARTITION '0'
"""
)
wait_mutations(node1, "zero_copy_mutation", 10)
wait_mutations(node2, "zero_copy_mutation", 10)
nodeX.query(
"""
DROP TABLE zero_copy_mutation SYNC
"""
)
# time to remove objects
time.sleep(10)
nodeY.query(
"""
SELECT count() FROM zero_copy_mutation
WHERE value3 LIKE '%ab%'
"""
)
2023-07-07 11:05:42 +00:00
check_objects_exist(cluster, objectsY)
2022-09-28 11:09:48 +00:00
nodeY.query(
"""
DROP TABLE zero_copy_mutation SYNC
"""
)
# time to remove objects
time.sleep(10)
check_objects_not_exisis(cluster, objectsY)