mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
569 lines
17 KiB
Python
569 lines
17 KiB
Python
import time
|
|
import pytest
|
|
import logging
|
|
import string
|
|
import random
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance(
|
|
"node",
|
|
main_configs=["config.d/backups.xml"],
|
|
stay_alive=True,
|
|
with_zookeeper=True,
|
|
)
|
|
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def create_table(node, table, replica, data_prefix="", aggressive_merge=True):
|
|
if data_prefix == "":
|
|
data_prefix = table
|
|
|
|
if aggressive_merge:
|
|
vertical_merge_algorithm_min_rows_to_activate = 1
|
|
vertical_merge_algorithm_min_columns_to_activate = 1
|
|
else:
|
|
vertical_merge_algorithm_min_rows_to_activate = 100000
|
|
vertical_merge_algorithm_min_columns_to_activate = 100
|
|
|
|
node.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {table} SYNC;
|
|
CREATE TABLE {table}
|
|
(
|
|
a String,
|
|
b String,
|
|
c Int64,
|
|
d Int64,
|
|
e Int64,
|
|
|
|
PROJECTION proj
|
|
(
|
|
SELECT c ORDER BY d
|
|
),
|
|
PROJECTION proj_2
|
|
(
|
|
SELECT d ORDER BY c
|
|
)
|
|
)
|
|
ENGINE = ReplicatedMergeTree('/test_broken_projection_{data_prefix}/data/', '{replica}') ORDER BY a
|
|
SETTINGS min_bytes_for_wide_part = 0,
|
|
max_parts_to_merge_at_once=3,
|
|
enable_vertical_merge_algorithm=1,
|
|
vertical_merge_algorithm_min_rows_to_activate = {vertical_merge_algorithm_min_rows_to_activate},
|
|
vertical_merge_algorithm_min_columns_to_activate = {vertical_merge_algorithm_min_columns_to_activate},
|
|
compress_primary_key=0;
|
|
"""
|
|
)
|
|
|
|
|
|
def insert(node, table, offset, size):
|
|
node.query(
|
|
f"""
|
|
INSERT INTO {table}
|
|
SELECT number, number, number, number, number%2 FROM numbers({offset}, {size})
|
|
SETTINGS insert_keeper_fault_injection_probability=0.0;
|
|
"""
|
|
)
|
|
|
|
|
|
def get_parts(node, table):
|
|
return (
|
|
node.query(
|
|
f"""
|
|
SELECT name
|
|
FROM system.parts
|
|
WHERE table='{table}' AND database=currentDatabase() AND active = 1
|
|
ORDER BY name;"
|
|
"""
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
|
|
|
|
def bash(node, command):
|
|
node.exec_in_container(["bash", "-c", command], privileged=True, user="root")
|
|
|
|
|
|
def break_projection(node, table, part, parent_part, break_type):
|
|
part_path = node.query(
|
|
f"""
|
|
SELECT path
|
|
FROM system.projection_parts
|
|
WHERE table='{table}'
|
|
AND database=currentDatabase()
|
|
AND active=1
|
|
AND part_name='{part}'
|
|
AND parent_name='{parent_part}'
|
|
ORDER BY modification_time DESC
|
|
LIMIT 1;
|
|
"""
|
|
).strip()
|
|
|
|
node.query(
|
|
f"select throwIf(substring('{part_path}', 1, 1) != '/', 'Path is relative: {part_path}')"
|
|
)
|
|
|
|
if break_type == "data":
|
|
bash(node, f"rm '{part_path}/d.bin'")
|
|
bash(node, f"rm '{part_path}/c.bin'")
|
|
elif break_type == "metadata":
|
|
bash(node, f"rm '{part_path}/columns.txt'")
|
|
elif break_type == "part":
|
|
bash(node, f"rm -r '{part_path}'")
|
|
|
|
|
|
def break_part(node, table, part):
|
|
part_path = node.query(
|
|
f"""
|
|
SELECT path
|
|
FROM system.parts
|
|
WHERE table='{table}'
|
|
AND database=currentDatabase()
|
|
AND active=1
|
|
AND part_name='{part}'
|
|
ORDER BY modification_time DESC
|
|
LIMIT 1;
|
|
"""
|
|
).strip()
|
|
|
|
node.query(
|
|
f"select throwIf(substring('{part_path}', 1, 1) != '/', 'Path is relative: {part_path}')"
|
|
)
|
|
bash(node, f"rm '{part_path}/columns.txt'")
|
|
|
|
|
|
def get_broken_projections_info(node, table):
|
|
return node.query(
|
|
f"""
|
|
SELECT parent_name, name, errors.name FROM
|
|
(
|
|
SELECT parent_name, name, exception_code
|
|
FROM system.projection_parts
|
|
WHERE table='{table}'
|
|
AND database=currentDatabase()
|
|
AND is_broken = 1
|
|
) AS parts_info
|
|
INNER JOIN system.errors AS errors
|
|
ON parts_info.exception_code = errors.code
|
|
ORDER BY parent_name, name
|
|
"""
|
|
).strip()
|
|
|
|
|
|
def get_projections_info(node, table):
|
|
return node.query(
|
|
f"""
|
|
SELECT parent_name, name, is_broken
|
|
FROM system.projection_parts
|
|
WHERE table='{table}'
|
|
AND active = 1
|
|
AND database=currentDatabase()
|
|
ORDER BY parent_name, name
|
|
"""
|
|
).strip()
|
|
|
|
|
|
def optimize(node, table, final, no_wait):
|
|
query = f"OPTIMIZE TABLE {table}"
|
|
if final:
|
|
query += " FINAL"
|
|
if no_wait:
|
|
query += " SETTINGS alter_sync=0"
|
|
node.query(query)
|
|
|
|
|
|
def reattach(node, table):
|
|
node.query(
|
|
f"""
|
|
DETACH TABLE {table};
|
|
ATTACH TABLE {table};
|
|
"""
|
|
)
|
|
|
|
|
|
def materialize_projection(node, table, proj):
|
|
node.query(
|
|
f"ALTER TABLE {table} MATERIALIZE PROJECTION {proj} SETTINGS mutations_sync=2"
|
|
)
|
|
|
|
|
|
def check_table_full(node, table):
|
|
return node.query(
|
|
f"CHECK TABLE {table} SETTINGS check_query_single_value_result = 0;"
|
|
).strip()
|
|
|
|
|
|
def random_str(length=6):
|
|
alphabet = string.ascii_lowercase + string.digits
|
|
return "".join(random.SystemRandom().choice(alphabet) for _ in range(length))
|
|
|
|
|
|
def check(node, table, check_result, expect_broken_part="", expected_error=""):
|
|
query_id = random_str()
|
|
|
|
if expect_broken_part == "proj":
|
|
assert expected_error in node.query_and_get_error(
|
|
f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c"
|
|
)
|
|
else:
|
|
node.query(
|
|
f"SELECT c FROM '{table}' WHERE d == 12 OR d == 16 ORDER BY c",
|
|
query_id=query_id,
|
|
)
|
|
assert "proj" in node.query(
|
|
f"""
|
|
SYSTEM FLUSH LOGS;
|
|
SELECT query, splitByChar('.', arrayJoin(projections))[-1]
|
|
FROM system.query_log
|
|
WHERE current_database=currentDatabase() AND query_id='{query_id}' AND type='QueryFinish'
|
|
"""
|
|
)
|
|
|
|
query_id = random_str()
|
|
|
|
if expect_broken_part == "proj_2":
|
|
assert expected_error in node.query_and_get_error(
|
|
f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d"
|
|
)
|
|
else:
|
|
node.query(
|
|
f"SELECT d FROM '{table}' WHERE c == 12 OR c == 16 ORDER BY d",
|
|
query_id=query_id,
|
|
)
|
|
assert "proj" in node.query(
|
|
f"""
|
|
SYSTEM FLUSH LOGS;
|
|
SELECT query, splitByChar('.', arrayJoin(projections))[-1]
|
|
FROM system.query_log
|
|
WHERE current_database=currentDatabase() AND query_id='{query_id}' AND type='QueryFinish'
|
|
"""
|
|
)
|
|
|
|
assert check_result == int(node.query(f"CHECK TABLE {table}"))
|
|
|
|
|
|
def test_broken_ignored(cluster):
|
|
node = cluster.instances["node"]
|
|
|
|
table_name = "test1"
|
|
create_table(node, table_name, 1)
|
|
|
|
insert(node, table_name, 0, 5)
|
|
insert(node, table_name, 5, 5)
|
|
insert(node, table_name, 10, 5)
|
|
insert(node, table_name, 15, 5)
|
|
|
|
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
|
|
node, table_name
|
|
)
|
|
|
|
# Break metadata (columns.txt) file of projection 'proj'
|
|
break_projection(node, table_name, "proj", "all_2_2_0", "metadata")
|
|
|
|
# Do select and after "check table" query.
|
|
# Select works because it does not read columns.txt.
|
|
# But expect check table result as 0.
|
|
check(node, table_name, 0)
|
|
|
|
# Projection 'proj' from part all_2_2_0 will now appear in broken parts info
|
|
# because it was marked broken during "check table" query.
|
|
assert "all_2_2_0\tproj\tFILE_DOESNT_EXIST" in get_broken_projections_info(
|
|
node, table_name
|
|
)
|
|
|
|
# Check table query will also show a list of parts which have broken projections.
|
|
assert "all_2_2_0" in check_table_full(node, table_name)
|
|
|
|
# Break data file of projection 'proj_2' for part all_2_2_0
|
|
break_projection(node, table_name, "proj_2", "all_2_2_0", "data")
|
|
|
|
# It will not yet appear in broken projections info.
|
|
assert "proj_2" not in get_broken_projections_info(node, table_name)
|
|
|
|
# Select now fails with error "File doesn't exist"
|
|
check(node, table_name, 0, "proj_2", "FILE_DOESNT_EXIST")
|
|
|
|
# Projection 'proj_2' from part all_2_2_0 will now appear in broken parts info.
|
|
assert "all_2_2_0\tproj_2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
|
|
node, table_name
|
|
)
|
|
|
|
# Second select works, because projection is now marked as broken.
|
|
check(node, table_name, 0)
|
|
|
|
# Break data file of projection 'proj_2' for part all_3_3_0
|
|
break_projection(node, table_name, "proj_2", "all_3_3_0", "data")
|
|
|
|
# It will not yet appear in broken projections info.
|
|
assert "all_3_3_0" not in get_broken_projections_info(node, table_name)
|
|
|
|
insert(node, table_name, 20, 5)
|
|
insert(node, table_name, 25, 5)
|
|
|
|
# Part all_3_3_0 has 'proj' and 'proj_2' projections, but 'proj_2' is broken and server does NOT know it yet.
|
|
# Parts all_4_4_0 and all_5_5_0 have both non-broken projections.
|
|
# So a merge will be create for future part all_3_5_1.
|
|
# During merge it will fail to read from 'proj_2' of part all_3_3_0 and proj_2 will be marked broken.
|
|
# Merge will be retried and on second attempt it will succeed.
|
|
# The result part all_3_5_1 will have only 1 projection - 'proj', because
|
|
# it will skip 'proj_2' as it will see that one part does not have it anymore in the set of valid projections.
|
|
optimize(node, table_name, 0, 1)
|
|
time.sleep(5)
|
|
|
|
# table_uuid=node.query(f"SELECT uuid FROM system.tables WHERE table='{table_name}' and database=currentDatabase()").strip()
|
|
# assert 0 < int(
|
|
# node.query(
|
|
# f"""
|
|
# SYSTEM FLUSH LOGS;
|
|
# SELECT count() FROM system.text_log
|
|
# WHERE level='Error'
|
|
# AND logger_name='MergeTreeBackgroundExecutor'
|
|
# AND message like 'Exception while executing background task %{table_uuid}:all_3_5_1%%Cannot open file%proj_2.proj/c.bin%'
|
|
# """)
|
|
# )
|
|
|
|
assert "all_3_3_0" in get_broken_projections_info(node, table_name)
|
|
check(node, table_name, 0)
|
|
|
|
|
|
def test_materialize_broken_projection(cluster):
|
|
node = cluster.instances["node"]
|
|
|
|
table_name = "test2"
|
|
create_table(node, table_name, 1)
|
|
|
|
insert(node, table_name, 0, 5)
|
|
insert(node, table_name, 5, 5)
|
|
insert(node, table_name, 10, 5)
|
|
insert(node, table_name, 15, 5)
|
|
|
|
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
|
|
node, table_name
|
|
)
|
|
|
|
break_projection(node, table_name, "proj", "all_1_1_0", "metadata")
|
|
reattach(node, table_name)
|
|
|
|
assert "all_1_1_0\tproj\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
|
|
node, table_name
|
|
)
|
|
assert "Part all_1_1_0 has a broken projection proj" in check_table_full(
|
|
node, table_name
|
|
)
|
|
|
|
break_projection(node, table_name, "proj_2", "all_1_1_0", "data")
|
|
reattach(node, table_name)
|
|
|
|
assert "all_1_1_0\tproj_2\tFILE_DOESNT_EXIST" in get_broken_projections_info(
|
|
node, table_name
|
|
)
|
|
assert "Part all_1_1_0 has a broken projection proj_2" in check_table_full(
|
|
node, table_name
|
|
)
|
|
|
|
materialize_projection(node, table_name, "proj")
|
|
|
|
assert "has a broken projection" not in check_table_full(node, table_name)
|
|
|
|
|
|
def test_broken_ignored_replicated(cluster):
|
|
node = cluster.instances["node"]
|
|
|
|
table_name = "test3"
|
|
table_name2 = "test3_replica"
|
|
create_table(node, table_name, 1)
|
|
|
|
insert(node, table_name, 0, 5)
|
|
insert(node, table_name, 5, 5)
|
|
insert(node, table_name, 10, 5)
|
|
insert(node, table_name, 15, 5)
|
|
|
|
check(node, table_name, 1)
|
|
|
|
create_table(node, table_name2, 2, table_name)
|
|
check(node, table_name2, 1)
|
|
|
|
break_projection(node, table_name, "proj", "all_0_0_0", "data")
|
|
assert "Part all_0_0_0 has a broken projection proj" in check_table_full(
|
|
node, table_name
|
|
)
|
|
|
|
break_part(node, table_name, "all_0_0_0")
|
|
node.query(f"SYSTEM SYNC REPLICA {table_name}")
|
|
assert "has a broken projection" not in check_table_full(node, table_name)
|
|
|
|
|
|
def get_random_string(string_length=8):
|
|
alphabet = string.ascii_letters + string.digits
|
|
return "".join((random.choice(alphabet) for _ in range(string_length)))
|
|
|
|
|
|
def test_broken_projections_in_backups(cluster):
|
|
node = cluster.instances["node"]
|
|
|
|
table_name = "test4"
|
|
create_table(node, table_name, 1, aggressive_merge=False)
|
|
|
|
node.query("SYSTEM STOP MERGES")
|
|
|
|
insert(node, table_name, 0, 5)
|
|
insert(node, table_name, 5, 5)
|
|
insert(node, table_name, 10, 5)
|
|
insert(node, table_name, 15, 5)
|
|
|
|
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
|
|
node, table_name
|
|
)
|
|
|
|
check(node, table_name, 1)
|
|
|
|
break_projection(node, table_name, "proj", "all_2_2_0", "data")
|
|
check(node, table_name, 0, "proj", "FILE_DOESNT_EXIST")
|
|
|
|
assert "all_2_2_0\tproj\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
|
|
node, table_name
|
|
)
|
|
|
|
assert (
|
|
"all_0_0_0\tproj\t0\n"
|
|
"all_0_0_0\tproj_2\t0\n"
|
|
"all_1_1_0\tproj\t0\n"
|
|
"all_1_1_0\tproj_2\t0\n"
|
|
"all_2_2_0\tproj\t1\n"
|
|
"all_2_2_0\tproj_2\t0\n"
|
|
"all_3_3_0\tproj\t0\n"
|
|
"all_3_3_0\tproj_2\t0" == get_projections_info(node, table_name)
|
|
)
|
|
|
|
backup_name = f"b1-{get_random_string()}"
|
|
assert "BACKUP_CREATED" in node.query(
|
|
f"""
|
|
set backup_restore_keeper_fault_injection_probability=0.0;
|
|
backup table {table_name} to Disk('backups', '{backup_name}') settings check_projection_parts=false;
|
|
"""
|
|
)
|
|
|
|
assert "RESTORED" in node.query(
|
|
f"""
|
|
drop table {table_name} sync;
|
|
set backup_restore_keeper_fault_injection_probability=0.0;
|
|
restore table {table_name} from Disk('backups', '{backup_name}');
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
"all_0_0_0\tproj\t0\n"
|
|
"all_0_0_0\tproj_2\t0\n"
|
|
"all_1_1_0\tproj\t0\n"
|
|
"all_1_1_0\tproj_2\t0\n"
|
|
"all_2_2_0\tproj\t0\n"
|
|
"all_2_2_0\tproj_2\t0\n"
|
|
"all_3_3_0\tproj\t0\n"
|
|
"all_3_3_0\tproj_2\t0" == get_projections_info(node, table_name)
|
|
)
|
|
|
|
check(node, table_name, 1)
|
|
assert "" == get_broken_projections_info(node, table_name)
|
|
|
|
break_projection(node, table_name, "proj_2", "all_2_2_0", "part")
|
|
|
|
check(node, table_name, 0, "proj_2", "ErrnoException")
|
|
|
|
assert "all_2_2_0\tproj_2\tFILE_DOESNT_EXIST" == get_broken_projections_info(
|
|
node, table_name
|
|
)
|
|
|
|
assert "FILE_DOESNT_EXIST" in node.query_and_get_error(
|
|
f"""
|
|
set backup_restore_keeper_fault_injection_probability=0.0;
|
|
backup table {table_name} to Disk('backups', 'b2')
|
|
"""
|
|
)
|
|
|
|
materialize_projection(node, table_name, "proj_2")
|
|
check(node, table_name, 1)
|
|
|
|
backup_name = f"b3-{get_random_string()}"
|
|
assert "BACKUP_CREATED" in node.query(
|
|
f"""
|
|
set backup_restore_keeper_fault_injection_probability=0.0;
|
|
backup table {table_name} to Disk('backups', '{backup_name}') settings check_projection_parts=false;
|
|
"""
|
|
)
|
|
|
|
assert "RESTORED" in node.query(
|
|
f"""
|
|
drop table {table_name} sync;
|
|
set backup_restore_keeper_fault_injection_probability=0.0;
|
|
restore table {table_name} from Disk('backups', '{backup_name}');
|
|
"""
|
|
)
|
|
check(node, table_name, 1)
|
|
|
|
assert (
|
|
"all_0_0_0\tproj\t0\n"
|
|
"all_0_0_0\tproj_2\t0\n"
|
|
"all_1_1_0\tproj\t0\n"
|
|
"all_1_1_0\tproj_2\t0\n"
|
|
"all_2_2_0\tproj\t0\n"
|
|
"all_2_2_0\tproj_2\t0\n"
|
|
"all_3_3_0\tproj\t0\n"
|
|
"all_3_3_0\tproj_2\t0" == get_projections_info(node, table_name)
|
|
)
|
|
|
|
break_projection(node, table_name, "proj", "all_1_1_0", "part")
|
|
assert "Part all_1_1_0 has a broken projection proj" in check_table_full(
|
|
node, table_name
|
|
)
|
|
assert "all_1_1_0\tproj\tFILE_DOESNT_EXIST" == get_broken_projections_info(
|
|
node, table_name
|
|
)
|
|
|
|
backup_name = f"b4-{get_random_string()}"
|
|
assert "BACKUP_CREATED" in node.query(
|
|
f"""
|
|
set backup_restore_keeper_fault_injection_probability=0.0;
|
|
backup table {table_name} to Disk('backups', '{backup_name}') settings check_projection_parts=false, allow_backup_broken_projections=true;
|
|
"""
|
|
)
|
|
|
|
assert "RESTORED" in node.query(
|
|
f"""
|
|
drop table {table_name} sync;
|
|
set backup_restore_keeper_fault_injection_probability=0.0;
|
|
restore table {table_name} from Disk('backups', '{backup_name}');
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
"all_0_0_0\tproj\t0\n"
|
|
"all_0_0_0\tproj_2\t0\n"
|
|
"all_1_1_0\tproj\t1\n"
|
|
"all_1_1_0\tproj_2\t0\n"
|
|
"all_2_2_0\tproj\t0\n"
|
|
"all_2_2_0\tproj_2\t0\n"
|
|
"all_3_3_0\tproj\t0\n"
|
|
"all_3_3_0\tproj_2\t0" == get_projections_info(node, table_name)
|
|
)
|
|
|
|
check(node, table_name, 0)
|
|
assert "all_1_1_0\tproj\tNO_FILE_IN_DATA_PART" == get_broken_projections_info(
|
|
node, table_name
|
|
)
|
|
node.query("SYSTEM START MERGES")
|