mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-11 18:14:03 +00:00
312 lines
12 KiB
Python
312 lines
12 KiB
Python
import datetime
|
|
import logging
|
|
import time
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
logging.getLogger().addHandler(logging.StreamHandler())
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance("node1", main_configs=["configs/config.d/s3.xml"], macros={'replica': '1'},
|
|
with_minio=True,
|
|
with_zookeeper=True)
|
|
cluster.add_instance("node2", main_configs=["configs/config.d/s3.xml"], macros={'replica': '2'},
|
|
with_minio=True,
|
|
with_zookeeper=True)
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def get_large_objects_count(cluster, size=100, folder='data'):
|
|
minio = cluster.minio_client
|
|
counter = 0
|
|
for obj in minio.list_objects(cluster.minio_bucket, '{}/'.format(folder)):
|
|
if obj.size >= size:
|
|
counter = counter + 1
|
|
return counter
|
|
|
|
|
|
def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
|
|
while timeout > 0:
|
|
if get_large_objects_count(cluster, size=size) == expected:
|
|
return
|
|
timeout -= 1
|
|
time.sleep(1)
|
|
assert get_large_objects_count(cluster, size=size) == expected
|
|
|
|
|
|
def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
|
|
deadline = time.monotonic() + timeout
|
|
num_parts = 0
|
|
while time.monotonic() < deadline:
|
|
num_parts_str = node.query("select count() from system.parts where table = '{}' and active".format(table_name))
|
|
num_parts = int(num_parts_str.strip())
|
|
if num_parts == num_expected_parts:
|
|
return
|
|
|
|
time.sleep(0.2)
|
|
|
|
assert num_parts == num_expected_parts
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"policy", ["s3"]
|
|
)
|
|
def test_s3_zero_copy_replication(cluster, policy):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query(
|
|
"""
|
|
CREATE TABLE s3_test ON CLUSTER test_cluster (id UInt32, value String)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}')
|
|
ORDER BY id
|
|
SETTINGS storage_policy='{}'
|
|
"""
|
|
.format('{replica}', policy)
|
|
)
|
|
|
|
node1.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
|
|
node2.query("SYSTEM SYNC REPLICA s3_test")
|
|
assert node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')"
|
|
assert node2.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')"
|
|
|
|
# Based on version 20.x - should be only one file with size 100+ (checksums.txt), used by both nodes
|
|
assert get_large_objects_count(cluster) == 1
|
|
|
|
node2.query("INSERT INTO s3_test VALUES (2,'data'),(3,'data')")
|
|
node1.query("SYSTEM SYNC REPLICA s3_test")
|
|
|
|
assert node2.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
|
assert node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
|
|
|
# Based on version 20.x - two parts
|
|
wait_for_large_objects_count(cluster, 2)
|
|
|
|
node1.query("OPTIMIZE TABLE s3_test FINAL")
|
|
|
|
# Based on version 20.x - after merge, two old parts and one merged
|
|
wait_for_large_objects_count(cluster, 3)
|
|
|
|
# Based on version 20.x - after cleanup - only one merged part
|
|
wait_for_large_objects_count(cluster, 1, timeout=60)
|
|
|
|
node1.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
|
node2.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
|
|
|
|
|
def test_s3_zero_copy_on_hybrid_storage(cluster):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query(
|
|
"""
|
|
CREATE TABLE hybrid_test ON CLUSTER test_cluster (id UInt32, value String)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/hybrid_test', '{}')
|
|
ORDER BY id
|
|
SETTINGS storage_policy='hybrid'
|
|
"""
|
|
.format('{replica}')
|
|
)
|
|
|
|
node1.query("INSERT INTO hybrid_test VALUES (0,'data'),(1,'data')")
|
|
node2.query("SYSTEM SYNC REPLICA hybrid_test")
|
|
|
|
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
|
|
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
|
|
|
|
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
|
|
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
|
|
|
|
node1.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
|
|
|
|
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
|
|
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
|
|
|
|
# Total objects in S3
|
|
s3_objects = get_large_objects_count(cluster, size=0)
|
|
|
|
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
|
|
|
|
assert node1.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
|
|
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
|
|
|
|
# Check that after moving partition on node2 no new obects on s3
|
|
wait_for_large_objects_count(cluster, s3_objects, size=0)
|
|
|
|
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
|
|
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
|
|
|
|
node1.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
|
|
node2.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
|
|
|
|
|
|
def insert_data_time(node, table, number_of_mb, time, start=0):
|
|
values = ','.join(f"({x},{time})" for x in range(start, int((1024 * 1024 * number_of_mb) / 8) + start + 1))
|
|
node.query(f"INSERT INTO {table} VALUES {values}")
|
|
|
|
|
|
def insert_large_data(node, table):
|
|
tm = time.mktime((datetime.date.today() - datetime.timedelta(days=7)).timetuple())
|
|
insert_data_time(node, table, 1, tm, 0)
|
|
tm = time.mktime((datetime.date.today() - datetime.timedelta(days=3)).timetuple())
|
|
insert_data_time(node, table, 1, tm, 1024*1024)
|
|
tm = time.mktime(datetime.date.today().timetuple())
|
|
insert_data_time(node, table, 10, tm, 1024*1024*2)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("storage_policy", "large_data", "iterations"),
|
|
[
|
|
("tiered", False, 10),
|
|
("tiered_copy", False, 10),
|
|
("tiered", True, 3),
|
|
("tiered_copy", True, 3),
|
|
]
|
|
)
|
|
def test_s3_zero_copy_with_ttl_move(cluster, storage_policy, large_data, iterations):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
|
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
|
|
|
for i in range(iterations):
|
|
node1.query(
|
|
"""
|
|
CREATE TABLE ttl_move_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_move_test', '{}')
|
|
ORDER BY d
|
|
TTL d1 + INTERVAL 2 DAY TO VOLUME 'external'
|
|
SETTINGS storage_policy='{}'
|
|
"""
|
|
.format('{replica}', storage_policy)
|
|
)
|
|
|
|
if large_data:
|
|
insert_large_data(node1, 'ttl_move_test')
|
|
else:
|
|
node1.query("INSERT INTO ttl_move_test VALUES (10, now() - INTERVAL 3 DAY)")
|
|
node1.query("INSERT INTO ttl_move_test VALUES (11, now() - INTERVAL 1 DAY)")
|
|
|
|
node1.query("OPTIMIZE TABLE ttl_move_test FINAL")
|
|
node2.query("SYSTEM SYNC REPLICA ttl_move_test")
|
|
|
|
if large_data:
|
|
assert node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(1572867)"
|
|
assert node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(1572867)"
|
|
else:
|
|
assert node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
|
|
assert node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
|
|
assert node1.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values") == "(10),(11)"
|
|
assert node2.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values") == "(10),(11)"
|
|
|
|
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
|
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("large_data", "iterations"),
|
|
[
|
|
(False, 10),
|
|
(True, 3),
|
|
]
|
|
)
|
|
def test_s3_zero_copy_with_ttl_delete(cluster, large_data, iterations):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
|
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
|
|
|
for i in range(iterations):
|
|
node1.query(
|
|
"""
|
|
CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_delete_test', '{}')
|
|
ORDER BY d
|
|
TTL d1 + INTERVAL 2 DAY
|
|
SETTINGS storage_policy='tiered'
|
|
"""
|
|
.format('{replica}')
|
|
)
|
|
|
|
if large_data:
|
|
insert_large_data(node1, 'ttl_delete_test')
|
|
else:
|
|
node1.query("INSERT INTO ttl_delete_test VALUES (10, now() - INTERVAL 3 DAY)")
|
|
node1.query("INSERT INTO ttl_delete_test VALUES (11, now() - INTERVAL 1 DAY)")
|
|
|
|
node1.query("OPTIMIZE TABLE ttl_delete_test FINAL")
|
|
node2.query("SYSTEM SYNC REPLICA ttl_delete_test")
|
|
|
|
if large_data:
|
|
assert node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1310721)"
|
|
assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1310721)"
|
|
else:
|
|
assert node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
|
|
assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
|
|
assert node1.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values") == "(11)"
|
|
assert node2.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values") == "(11)"
|
|
|
|
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
|
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
|
|
|
|
|
def test_s3_zero_copy_concurrent_merge(cluster):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query("DROP TABLE IF EXISTS concurrent_merge NO DELAY")
|
|
node2.query("DROP TABLE IF EXISTS concurrent_merge NO DELAY")
|
|
|
|
for node in (node1, node2):
|
|
node.query(
|
|
"""
|
|
CREATE TABLE concurrent_merge (id UInt64)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/concurrent_merge', '{replica}')
|
|
ORDER BY id
|
|
SETTINGS index_granularity=2, storage_policy='s3', remote_fs_execute_merges_on_single_replica_time_threshold=1
|
|
"""
|
|
)
|
|
|
|
node1.query("system stop merges")
|
|
node2.query("system stop merges")
|
|
|
|
# This will generate two parts with 20 granules each
|
|
node1.query("insert into concurrent_merge select number from numbers(40)")
|
|
node1.query("insert into concurrent_merge select number + 1 from numbers(40)")
|
|
|
|
wait_for_active_parts(node2, 2, 'concurrent_merge')
|
|
|
|
# Merge will materialize default column, it should sleep every granule and take 20 * 2 * 0.1 = 4 sec.
|
|
node1.query("alter table concurrent_merge add column x UInt32 default sleep(0.1)")
|
|
|
|
node1.query("system start merges")
|
|
node2.query("system start merges")
|
|
|
|
# Now, the merge should start.
|
|
# Because of remote_fs_execute_merges_on_single_replica_time_threshold=1,
|
|
# only one replica will start merge instantly.
|
|
# The other replica should wait for 1 sec and also start it.
|
|
# That should probably cause a data race at s3 storage.
|
|
# For now, it does not happen (every blob has a random name, and we just have a duplicating data)
|
|
node1.query("optimize table concurrent_merge final")
|
|
|
|
wait_for_active_parts(node1, 1, 'concurrent_merge')
|
|
wait_for_active_parts(node2, 1, 'concurrent_merge')
|
|
|
|
for node in (node1, node2):
|
|
assert node.query('select sum(id) from concurrent_merge').strip() == '1600'
|