mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 19:02:04 +00:00
275 lines
9.4 KiB
Python
275 lines
9.4 KiB
Python
import pytest
|
|
import helpers.client
|
|
import helpers.cluster
|
|
import time
|
|
from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk
|
|
|
|
|
|
cluster = helpers.cluster.ClickHouseCluster(__file__)
|
|
|
|
node1 = cluster.add_instance(
|
|
"node1",
|
|
main_configs=["configs/fast_background_pool.xml", "configs/compat.xml"],
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
)
|
|
|
|
node2 = cluster.add_instance(
|
|
"node2",
|
|
main_configs=["configs/fast_background_pool.xml"],
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
)
|
|
|
|
node3 = cluster.add_instance(
|
|
"node3",
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_merge_tree_load_parts(started_cluster):
|
|
node1.query(
|
|
"""
|
|
CREATE TABLE mt_load_parts (pk UInt32, id UInt32, s String)
|
|
ENGINE = MergeTree ORDER BY id PARTITION BY pk"""
|
|
)
|
|
|
|
node1.query("SYSTEM STOP MERGES mt_load_parts")
|
|
|
|
for i in range(20):
|
|
node1.query(
|
|
f"INSERT INTO mt_load_parts VALUES (44, {i}, randomPrintableASCII(10))"
|
|
)
|
|
|
|
node1.restart_clickhouse(kill=True)
|
|
for i in range(1, 21):
|
|
assert node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0")
|
|
|
|
node1.query("OPTIMIZE TABLE mt_load_parts FINAL")
|
|
node1.restart_clickhouse(kill=True)
|
|
|
|
node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts")
|
|
|
|
assert node1.contains_in_log("Loading Active part 44_1_20")
|
|
for i in range(1, 21):
|
|
assert not node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0")
|
|
assert node1.contains_in_log(f"Loading Outdated part 44_{i}_{i}_0")
|
|
|
|
assert node1.query("SELECT count() FROM mt_load_parts") == "20\n"
|
|
|
|
assert (
|
|
node1.query(
|
|
"SELECT count() FROM system.parts WHERE table = 'mt_load_parts' AND active"
|
|
)
|
|
== "1\n"
|
|
)
|
|
|
|
node1.query("ALTER TABLE mt_load_parts MODIFY SETTING old_parts_lifetime = 1")
|
|
node1.query("DETACH TABLE mt_load_parts")
|
|
node1.query("ATTACH TABLE mt_load_parts")
|
|
|
|
node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts")
|
|
|
|
table_path = node1.query(
|
|
"SELECT data_paths[1] FROM system.tables WHERE table = 'mt_load_parts'"
|
|
).strip()
|
|
|
|
part_dirs = node1.exec_in_container(["bash", "-c", f"ls {table_path}"], user="root")
|
|
|
|
part_dirs = list(
|
|
set(part_dirs.strip().split("\n")) - {"detached", "format_version.txt"}
|
|
)
|
|
|
|
MAX_RETRY = 10
|
|
part_dirs_ok = False
|
|
for _ in range(MAX_RETRY):
|
|
part_dirs = node1.exec_in_container(
|
|
["bash", "-c", f"ls {table_path}"], user="root"
|
|
)
|
|
part_dirs = list(
|
|
set(part_dirs.strip().split("\n")) - {"detached", "format_version.txt"}
|
|
)
|
|
part_dirs_ok = len(part_dirs) == 1 and part_dirs[0].startswith("44_1_20")
|
|
if part_dirs_ok:
|
|
break
|
|
time.sleep(2)
|
|
|
|
assert part_dirs_ok
|
|
|
|
|
|
def test_merge_tree_load_parts_corrupted(started_cluster):
|
|
for i, node in enumerate([node1, node2]):
|
|
node.query(
|
|
f"""
|
|
CREATE TABLE mt_load_parts_2 (pk UInt32, id UInt32, s String)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/mt_load_parts_2', '{i}') ORDER BY id PARTITION BY pk"""
|
|
)
|
|
|
|
"""min-max blocks in created parts: 1_1_0, 2_2_0, 1_2_1, 3_3_0, 1_3_2"""
|
|
for partition in [111, 222, 333]:
|
|
node1.query(
|
|
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 0, randomPrintableASCII(10))"
|
|
)
|
|
|
|
node1.query(
|
|
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 1, randomPrintableASCII(10))"
|
|
)
|
|
|
|
node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL")
|
|
|
|
node1.query(
|
|
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 2, randomPrintableASCII(10))"
|
|
)
|
|
|
|
node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL")
|
|
|
|
node2.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
|
|
|
|
def get_part_name(node, partition, min_block, max_block):
|
|
return node.query(
|
|
f"""
|
|
SELECT name FROM system.parts
|
|
WHERE table = 'mt_load_parts_2'
|
|
AND partition = '{partition}'
|
|
AND min_block_number = {min_block}
|
|
AND max_block_number = {max_block}"""
|
|
).strip()
|
|
|
|
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 111, 0, 2))
|
|
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 2))
|
|
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 1))
|
|
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 0, 1))
|
|
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 2, 2))
|
|
|
|
node1.restart_clickhouse(kill=True)
|
|
node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts_2")
|
|
|
|
def check_parts_loading(node, partition, loaded, failed, skipped):
|
|
# The whole test produces around 6-700 lines, so 2k is plenty enough.
|
|
# wait_for_log_line uses tail + grep, so the overhead is negligible
|
|
look_behind_lines = 2000
|
|
for min_block, max_block in loaded:
|
|
part_name = f"{partition}_{min_block}_{max_block}"
|
|
assert node.wait_for_log_line(
|
|
f"Loading Active part {part_name}", look_behind_lines=look_behind_lines
|
|
)
|
|
assert node.wait_for_log_line(
|
|
f"Finished loading Active part {part_name}",
|
|
look_behind_lines=look_behind_lines,
|
|
)
|
|
|
|
failed_part_names = []
|
|
# Let's wait until there is some information about all expected parts, and only
|
|
# check the absence of not expected log messages after all expected logs are present
|
|
for min_block, max_block in failed:
|
|
part_name = f"{partition}_{min_block}_{max_block}"
|
|
failed_part_names.append(part_name)
|
|
assert node.wait_for_log_line(
|
|
f"Loading Active part {part_name}", look_behind_lines=look_behind_lines
|
|
)
|
|
|
|
for failed_part_name in failed_part_names:
|
|
assert not node.contains_in_log(
|
|
f"Finished loading Active part {failed_part_name}"
|
|
)
|
|
|
|
for min_block, max_block in skipped:
|
|
part_name = f"{partition}_{min_block}_{max_block}"
|
|
assert not node.contains_in_log(f"Loading Active part {part_name}")
|
|
assert not node.contains_in_log(f"Finished loading Active part {part_name}")
|
|
|
|
check_parts_loading(
|
|
node1, 111, loaded=[(0, 1), (2, 2)], failed=[(0, 2)], skipped=[(0, 0), (1, 1)]
|
|
)
|
|
check_parts_loading(
|
|
node1, 222, loaded=[(0, 0), (1, 1), (2, 2)], failed=[(0, 2), (0, 1)], skipped=[]
|
|
)
|
|
check_parts_loading(
|
|
node1, 333, loaded=[(0, 2)], failed=[], skipped=[(0, 0), (1, 1), (2, 2), (0, 1)]
|
|
)
|
|
|
|
node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
|
|
node1.query("OPTIMIZE TABLE mt_load_parts_2 FINAL")
|
|
node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
|
|
|
|
assert (
|
|
node1.query(
|
|
"""
|
|
SELECT pk, count() FROM mt_load_parts_2
|
|
GROUP BY pk ORDER BY pk"""
|
|
)
|
|
== "111\t3\n222\t3\n333\t3\n"
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"""
|
|
SELECT partition, count()
|
|
FROM system.parts WHERE table = 'mt_load_parts_2' AND active
|
|
GROUP BY partition ORDER BY partition"""
|
|
)
|
|
== "111\t1\n222\t1\n333\t1\n"
|
|
)
|
|
|
|
|
|
def test_merge_tree_load_parts_filesystem_error(started_cluster):
|
|
if node3.is_built_with_sanitizer() or node3.is_debug_build():
|
|
pytest.skip(
|
|
"Skip with debug build and sanitizers. \
|
|
This test intentionally triggers LOGICAL_ERROR which leads to crash with those builds"
|
|
)
|
|
|
|
node3.query(
|
|
"""
|
|
CREATE TABLE mt_load_parts (id UInt32)
|
|
ENGINE = MergeTree ORDER BY id
|
|
SETTINGS index_granularity_bytes = 0"""
|
|
)
|
|
|
|
node3.query("SYSTEM STOP MERGES mt_load_parts")
|
|
|
|
for i in range(2):
|
|
node3.query(f"INSERT INTO mt_load_parts VALUES ({i})")
|
|
|
|
# We want to somehow check that exception thrown on part creation is handled during part loading.
|
|
# It can be a filesystem exception triggered at initialization of part storage but it hard
|
|
# to trigger it because it should be an exception on stat/listDirectory.
|
|
# The most easy way to trigger such exception is to use chmod but clickhouse server
|
|
# is run with root user in integration test and this won't work. So let's do
|
|
# some stupid things: create a table without adaptive granularity and change mark
|
|
# extensions of data files in part to make clickhouse think that it's a compact part which
|
|
# cannot be created in such table. This will trigger a LOGICAL_ERROR on part creation.
|
|
|
|
def corrupt_part(table, part_name):
|
|
part_path = node3.query(
|
|
"SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(
|
|
table, part_name
|
|
)
|
|
).strip()
|
|
|
|
node3.exec_in_container(
|
|
["bash", "-c", f"mv {part_path}id.cmrk {part_path}id.cmrk3"],
|
|
privileged=True,
|
|
)
|
|
|
|
corrupt_part("mt_load_parts", "all_1_1_0")
|
|
node3.restart_clickhouse(kill=True)
|
|
|
|
assert node3.query("SELECT * FROM mt_load_parts") == "1\n"
|
|
assert (
|
|
node3.query(
|
|
"SELECT name FROM system.detached_parts WHERE table = 'mt_load_parts'"
|
|
)
|
|
== "broken-on-start_all_1_1_0\n"
|
|
)
|