import time import pytest import logging import string import random from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) @pytest.fixture(scope="module") def cluster(): try: cluster = ClickHouseCluster(__file__) cluster.add_instance( "node", main_configs=["config.d/backups.xml"], stay_alive=True, with_zookeeper=True, ) logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") yield cluster finally: cluster.shutdown() def create_table(node, table, replica, data_prefix=""): if data_prefix == "": data_prefix = table node.query( f""" DROP TABLE IF EXISTS {table} SYNC; CREATE TABLE {table} ( a String, b String, c Int64, d Int64, e Int64, PROJECTION proj ( SELECT c ORDER BY d ), PROJECTION proj_2 ( SELECT d ORDER BY c ) ) ENGINE = ReplicatedMergeTree('/test_broken_projection_{data_prefix}/data/', '{replica}') ORDER BY a SETTINGS min_bytes_for_wide_part = 0, max_parts_to_merge_at_once=3, enable_vertical_merge_algorithm=1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1, compress_primary_key=0; """ ) def insert(node, table, offset, size): node.query( f""" INSERT INTO {table} SELECT number, number, number, number, number%2 FROM numbers({offset}, {size}) SETTINGS insert_keeper_fault_injection_probability=0.0; """ ) def get_parts(node, table): return ( node.query( f""" SELECT name FROM system.parts WHERE table='{table}' AND database=currentDatabase() AND active = 1 ORDER BY name;" """ ) .strip() .split("\n") ) def bash(node, command): node.exec_in_container(["bash", "-c", command], privileged=True, user="root") def break_projection(node, table, part, parent_part, break_type): part_path = node.query( f""" SELECT path FROM system.projection_parts WHERE table='{table}' AND database=currentDatabase() AND active=1 AND part_name='{part}' AND parent_name='{parent_part}' ORDER BY modification_time DESC LIMIT 1; """ ).strip() node.query( f"select throwIf(substring('{part_path}', 1, 1) != '/', 'Path is relative: {part_path}')" ) if break_type == "data": bash(node, f"rm '{part_path}/d.bin'") bash(node, f"rm '{part_path}/c.bin'") elif break_type == "metadata": bash(node, f"rm '{part_path}/columns.txt'") elif break_type == "part": bash(node, f"rm -r '{part_path}'") def break_part(node, table, part): part_path = node.query( f""" SELECT path FROM system.parts WHERE table='{table}' AND database=currentDatabase() AND active=1 AND part_name='{part}' ORDER BY modification_time DESC LIMIT 1; """ ).strip() node.query( f"select throwIf(substring('{part_path}', 1, 1) != '/', 'Path is relative: {part_path}')" ) bash(node, f"rm '{part_path}/columns.txt'") def get_broken_projections_info(node, table): return node.query( f""" SELECT parent_name, name, errors.name FROM ( SELECT parent_name, name, exception_code FROM system.projection_parts WHERE table='{table}' AND database=currentDatabase() AND is_broken = 1 ) AS parts_info INNER JOIN system.errors AS errors ON parts_info.exception_code = errors.code ORDER BY parent_name, name """ ).strip() def optimize(node, table, final, no_wait): query = f"OPTIMIZE TABLE {table}" if final: query += " FINAL" if no_wait: query += " SETTINGS alter_sync=0" node.query(query) def reattach(node, table): node.query( f""" DETACH TABLE {table}; ATTACH TABLE {table}; """ ) def materialize_projection(node, table, proj): node.query( f"ALTER TABLE {table} MATERIALIZE PROJECTION {proj} SETTINGS mutations_sync=2" ) def check_table_full(node, table): return node.query( f"CHECK TABLE {table} SETTINGS check_query_single_value_result = 0;" ).strip() def random_str(length=6): alphabet = string.ascii_lowercase + string.digits return "".join(random.SystemRandom().choice(alphabet) for _ in range(length)) def check(node, table, check_result, expect_broken_part="", expected_error=""): query_id = random_str() if expect_broken_part == "proj": assert expected_error in node.query_and_get_error( f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c" ) else: node.query( f"SELECT c FROM '{table}' WHERE d == 12 OR d == 16 ORDER BY c", query_id=query_id, ) assert "proj" in node.query( f""" SYSTEM FLUSH LOGS; SELECT query, splitByChar('.', arrayJoin(projections))[-1] FROM system.query_log WHERE current_database=currentDatabase() AND query_id='{query_id}' AND type='QueryFinish' """ ) query_id = random_str() if expect_broken_part == "proj_2": assert expected_error in node.query_and_get_error( f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d" ) else: node.query( f"SELECT d FROM '{table}' WHERE c == 12 OR c == 16 ORDER BY d", query_id=query_id, ) assert "proj" in node.query( f""" SYSTEM FLUSH LOGS; SELECT query, splitByChar('.', arrayJoin(projections))[-1] FROM system.query_log WHERE current_database=currentDatabase() AND query_id='{query_id}' AND type='QueryFinish' """ ) assert check_result == int(node.query(f"CHECK TABLE {table}")) def test_broken_ignored(cluster): node = cluster.instances["node"] table_name = "test1" create_table(node, table_name, 1) insert(node, table_name, 0, 5) insert(node, table_name, 5, 5) insert(node, table_name, 10, 5) insert(node, table_name, 15, 5) assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts( node, table_name ) # Break metadata (columns.txt) file of projection 'proj' break_projection(node, table_name, "proj", "all_2_2_0", "metadata") # Do select and after "check table" query. # Select works because it does not read columns.txt. # But expect check table result as 0. check(node, table_name, 0) # Projection 'proj' from part all_2_2_0 will now appear in broken parts info # because it was marked broken during "check table" query. assert "all_2_2_0\tproj\tFILE_DOESNT_EXIST" in get_broken_projections_info( node, table_name ) # Check table query will also show a list of parts which have broken projections. assert "all_2_2_0" in check_table_full(node, table_name) # Break data file of projection 'proj_2' for part all_2_2_0 break_projection(node, table_name, "proj_2", "all_2_2_0", "data") # It will not yet appear in broken projections info. assert "proj_2" not in get_broken_projections_info(node, table_name) # Select now fails with error "File doesn't exist" check(node, table_name, 0, "proj_2", "FILE_DOESNT_EXIST") # Projection 'proj_2' from part all_2_2_0 will now appear in broken parts info. assert "all_2_2_0\tproj_2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info( node, table_name ) # Second select works, because projection is now marked as broken. check(node, table_name, 0) # Break data file of projection 'proj_2' for part all_3_3_0 break_projection(node, table_name, "proj_2", "all_3_3_0", "data") # It will not yet appear in broken projections info. assert "all_3_3_0" not in get_broken_projections_info(node, table_name) insert(node, table_name, 20, 5) insert(node, table_name, 25, 5) # Part all_3_3_0 has 'proj' and 'proj_2' projections, but 'proj_2' is broken and server does NOT know it yet. # Parts all_4_4_0 and all_5_5_0 have both non-broken projections. # So a merge will be create for future part all_3_5_1. # During merge it will fail to read from 'proj_2' of part all_3_3_0 and proj_2 will be marked broken. # Merge will be retried and on second attempt it will succeed. # The result part all_3_5_1 will have only 1 projection - 'proj', because # it will skip 'proj_2' as it will see that one part does not have it anymore in the set of valid projections. optimize(node, table_name, 0, 1) time.sleep(5) # table_uuid=node.query(f"SELECT uuid FROM system.tables WHERE table='{table_name}' and database=currentDatabase()").strip() # assert 0 < int( # node.query( # f""" # SYSTEM FLUSH LOGS; # SELECT count() FROM system.text_log # WHERE level='Error' # AND logger_name='MergeTreeBackgroundExecutor' # AND message like 'Exception while executing background task %{table_uuid}:all_3_5_1%%Cannot open file%proj_2.proj/c.bin%' # """) # ) assert "all_3_3_0" in get_broken_projections_info(node, table_name) check(node, table_name, 0) def test_materialize_broken_projection(cluster): node = cluster.instances["node"] table_name = "test2" create_table(node, table_name, 1) insert(node, table_name, 0, 5) insert(node, table_name, 5, 5) insert(node, table_name, 10, 5) insert(node, table_name, 15, 5) assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts( node, table_name ) break_projection(node, table_name, "proj", "all_1_1_0", "metadata") reattach(node, table_name) assert "all_1_1_0\tproj\tNO_FILE_IN_DATA_PART" in get_broken_projections_info( node, table_name ) assert "Part all_1_1_0 has a broken projection proj" in check_table_full( node, table_name ) break_projection(node, table_name, "proj_2", "all_1_1_0", "data") reattach(node, table_name) assert "all_1_1_0\tproj_2\tFILE_DOESNT_EXIST" in get_broken_projections_info( node, table_name ) assert "Part all_1_1_0 has a broken projection proj_2" in check_table_full( node, table_name ) materialize_projection(node, table_name, "proj") assert "has a broken projection" not in check_table_full(node, table_name) def test_broken_ignored_replicated(cluster): node = cluster.instances["node"] table_name = "test3" table_name2 = "test3_replica" create_table(node, table_name, 1) insert(node, table_name, 0, 5) insert(node, table_name, 5, 5) insert(node, table_name, 10, 5) insert(node, table_name, 15, 5) check(node, table_name, 1) create_table(node, table_name2, 2, table_name) check(node, table_name2, 1) break_projection(node, table_name, "proj", "all_0_0_0", "data") assert "Part all_0_0_0 has a broken projection proj" in check_table_full( node, table_name ) break_part(node, table_name, "all_0_0_0") node.query(f"SYSTEM SYNC REPLICA {table_name}") assert "has a broken projection" not in check_table_full(node, table_name) def test_broken_projections_in_backups(cluster): node = cluster.instances["node"] table_name = "test4" create_table(node, table_name, 1) insert(node, table_name, 0, 5) insert(node, table_name, 5, 5) insert(node, table_name, 10, 5) insert(node, table_name, 15, 5) check(node, table_name, 1) break_projection(node, table_name, "proj", "all_2_2_0", "data") check(node, table_name, 0, "proj", "FILE_DOESNT_EXIST") assert "all_2_2_0\tproj\tNO_FILE_IN_DATA_PART" in get_broken_projections_info( node, table_name ) assert "BACKUP_CREATED" in node.query( f""" set backup_restore_keeper_fault_injection_probability=0.0; backup table {table_name} to Disk('backups', 'b1') settings check_projection_parts=false; """ ) assert "RESTORED" in node.query( f""" drop table {table_name} sync; set backup_restore_keeper_fault_injection_probability=0.0; restore table {table_name} from Disk('backups', 'b1'); """ ) check(node, table_name, 1) assert "" == get_broken_projections_info(node, table_name) # TODO: add a check for what projections are loaded break_projection(node, table_name, "proj", "all_2_2_0", "part") check(node, table_name, 0, "proj", "ErrnoException") assert "all_2_2_0\tproj\tFILE_DOESNT_EXIST" == get_broken_projections_info( node, table_name ) assert "FILE_DOESNT_EXIST" in node.query_and_get_error( f""" set backup_restore_keeper_fault_injection_probability=0.0; backup table {table_name} to Disk('backups', 'b2') """ ) materialize_projection(node, table_name, "proj") check(node, table_name, 1) # TODO: # assert "all_2_2_0\tproj\tFILE_DOESNT_EXIST" == get_broken_projections_info(node, table_name) assert "BACKUP_CREATED" in node.query( f""" set backup_restore_keeper_fault_injection_probability=0.0; backup table {table_name} to Disk('backups', 'b3') settings check_projection_parts=false; """ ) assert "RESTORED" in node.query( f""" drop table {table_name} sync; set backup_restore_keeper_fault_injection_probability=0.0; restore table {table_name} from Disk('backups', 'b3'); """ ) check(node, table_name, 1) break_projection(node, table_name, "proj", "all_1_1_0", "part") # TODO: check(node, table_name, 0, "proj", "FILE_DOESNT_EXIST") assert "Part all_1_1_0 has a broken projection proj" in check_table_full( node, table_name ) assert "all_1_1_0\tproj\tFILE_DOESNT_EXIST" == get_broken_projections_info( node, table_name ) assert "BACKUP_CREATED" in node.query( f""" set backup_restore_keeper_fault_injection_probability=0.0; backup table {table_name} to Disk('backups', 'b4') settings check_projection_parts=false; """ ) assert "RESTORED" in node.query( f""" drop table {table_name} sync; set backup_restore_keeper_fault_injection_probability=0.0; restore table {table_name} from Disk('backups', 'b4'); """ ) check(node, table_name, 1) assert "" == get_broken_projections_info(node, table_name)