ClickHouse/tests/integration/test_merge_tree_load_parts/test.py
2023-05-07 16:33:07 +00:00

257 lines
8.7 KiB
Python

import pytest
import helpers.client
import helpers.cluster
import time
from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk
cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/fast_background_pool.xml"],
with_zookeeper=True,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/fast_background_pool.xml"],
with_zookeeper=True,
stay_alive=True,
)
node3 = cluster.add_instance(
"node3",
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_merge_tree_load_parts(started_cluster):
node1.query(
"""
CREATE TABLE mt_load_parts (pk UInt32, id UInt32, s String)
ENGINE = MergeTree ORDER BY id PARTITION BY pk"""
)
node1.query("SYSTEM STOP MERGES mt_load_parts")
for i in range(20):
node1.query(
f"INSERT INTO mt_load_parts VALUES (44, {i}, randomPrintableASCII(10))"
)
node1.restart_clickhouse(kill=True)
for i in range(1, 21):
assert node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0")
node1.query("OPTIMIZE TABLE mt_load_parts FINAL")
node1.restart_clickhouse(kill=True)
node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts")
assert node1.contains_in_log("Loading Active part 44_1_20")
for i in range(1, 21):
assert not node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0")
assert node1.contains_in_log(f"Loading Outdated part 44_{i}_{i}_0")
assert node1.query("SELECT count() FROM mt_load_parts") == "20\n"
assert (
node1.query(
"SELECT count() FROM system.parts WHERE table = 'mt_load_parts' AND active"
)
== "1\n"
)
node1.query("ALTER TABLE mt_load_parts MODIFY SETTING old_parts_lifetime = 1")
node1.query("DETACH TABLE mt_load_parts")
node1.query("ATTACH TABLE mt_load_parts")
node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts")
table_path = node1.query(
"SELECT data_paths[1] FROM system.tables WHERE table = 'mt_load_parts'"
).strip()
part_dirs = node1.exec_in_container(["bash", "-c", f"ls {table_path}"], user="root")
part_dirs = list(
set(part_dirs.strip().split("\n")) - {"detached", "format_version.txt"}
)
MAX_RETRY = 10
part_dirs_ok = False
for _ in range(MAX_RETRY):
part_dirs = node1.exec_in_container(
["bash", "-c", f"ls {table_path}"], user="root"
)
part_dirs = list(
set(part_dirs.strip().split("\n")) - {"detached", "format_version.txt"}
)
part_dirs_ok = len(part_dirs) == 1 and part_dirs[0].startswith("44_1_20")
if part_dirs_ok:
break
time.sleep(2)
assert part_dirs_ok
def test_merge_tree_load_parts_corrupted(started_cluster):
for i, node in enumerate([node1, node2]):
node.query(
f"""
CREATE TABLE mt_load_parts_2 (pk UInt32, id UInt32, s String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/mt_load_parts_2', '{i}') ORDER BY id PARTITION BY pk"""
)
"""min-max blocks in created parts: 1_1_0, 2_2_0, 1_2_1, 3_3_0, 1_3_2"""
for partition in [111, 222, 333]:
node1.query(
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 0, randomPrintableASCII(10))"
)
node1.query(
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 1, randomPrintableASCII(10))"
)
node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL")
node1.query(
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 2, randomPrintableASCII(10))"
)
node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL")
node2.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
def get_part_name(node, partition, min_block, max_block):
return node.query(
f"""
SELECT name FROM system.parts
WHERE table = 'mt_load_parts_2'
AND partition = '{partition}'
AND min_block_number = {min_block}
AND max_block_number = {max_block}"""
).strip()
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 111, 0, 2))
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 2))
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 1))
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 0, 1))
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 2, 2))
node1.restart_clickhouse(kill=True)
node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts_2")
def check_parts_loading(node, partition, loaded, failed, skipped):
for min_block, max_block in loaded:
part_name = f"{partition}_{min_block}_{max_block}"
assert node.contains_in_log(f"Loading Active part {part_name}")
assert node.contains_in_log(f"Finished loading Active part {part_name}")
for min_block, max_block in failed:
part_name = f"{partition}_{min_block}_{max_block}"
assert node.contains_in_log(f"Loading Active part {part_name}")
assert not node.contains_in_log(f"Finished loading Active part {part_name}")
for min_block, max_block in skipped:
part_name = f"{partition}_{min_block}_{max_block}"
assert not node.contains_in_log(f"Loading Active part {part_name}")
assert not node.contains_in_log(f"Finished loading Active part {part_name}")
check_parts_loading(
node1, 111, loaded=[(0, 1), (2, 2)], failed=[(0, 2)], skipped=[(0, 0), (1, 1)]
)
check_parts_loading(
node1, 222, loaded=[(0, 0), (1, 1), (2, 2)], failed=[(0, 2), (0, 1)], skipped=[]
)
check_parts_loading(
node1, 333, loaded=[(0, 2)], failed=[], skipped=[(0, 0), (1, 1), (2, 2), (0, 1)]
)
node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
node1.query("OPTIMIZE TABLE mt_load_parts_2 FINAL")
node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
assert (
node1.query(
"""
SELECT pk, count() FROM mt_load_parts_2
GROUP BY pk ORDER BY pk"""
)
== "111\t3\n222\t3\n333\t3\n"
)
assert (
node1.query(
"""
SELECT partition, count()
FROM system.parts WHERE table = 'mt_load_parts_2' AND active
GROUP BY partition ORDER BY partition"""
)
== "111\t1\n222\t1\n333\t1\n"
)
def test_merge_tree_load_parts_filesystem_error(started_cluster):
if node3.is_built_with_sanitizer() or node3.is_debug_build():
pytest.skip(
"Skip with debug build and sanitizers. \
This test intentionally triggers LOGICAL_ERROR which leads to crash with those builds"
)
node3.query(
"""
CREATE TABLE mt_load_parts (id UInt32)
ENGINE = MergeTree ORDER BY id
SETTINGS index_granularity_bytes = 0"""
)
node3.query("SYSTEM STOP MERGES mt_load_parts")
for i in range(2):
node3.query(f"INSERT INTO mt_load_parts VALUES ({i})")
# We want to somehow check that exception thrown on part creation is handled during part loading.
# It can be a filesystem exception triggered at initialization of part storage but it hard
# to trigger it because it should be an exception on stat/listDirectory.
# The most easy way to trigger such exception is to use chmod but clickhouse server
# is run with root user in integration test and this won't work. So let's do
# some stupid things: create a table without adaptive granularity and change mark
# extensions of data files in part to make clickhouse think that it's a compact part which
# cannot be created in such table. This will trigger a LOGICAL_ERROR on part creation.
def corrupt_part(table, part_name):
part_path = node3.query(
"SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(
table, part_name
)
).strip()
node3.exec_in_container(
["bash", "-c", f"mv {part_path}id.cmrk {part_path}id.cmrk3"],
privileged=True,
)
corrupt_part("mt_load_parts", "all_1_1_0")
node3.restart_clickhouse(kill=True)
assert node3.query("SELECT * FROM mt_load_parts") == "1\n"
assert (
node3.query(
"SELECT name FROM system.detached_parts WHERE table = 'mt_load_parts'"
)
== "broken-on-start_all_1_1_0\n"
)