import random import string import logging import pytest import time from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', main_configs=['configs/default_compression.xml', 'configs/wide_parts_only.xml'], with_zookeeper=True) node2 = cluster.add_instance('node2', main_configs=['configs/default_compression.xml', 'configs/wide_parts_only.xml'], with_zookeeper=True) node3 = cluster.add_instance('node3', main_configs=['configs/default_compression.xml', 'configs/wide_parts_only.xml'], image='yandex/clickhouse-server', tag='20.3.16', stay_alive=True, with_installed_binary=True) node4 = cluster.add_instance('node4') @pytest.fixture(scope="module") def start_cluster(): try: cluster.start() yield cluster finally: cluster.shutdown() def get_compression_codec_byte(node, table_name, part_name): cmd = "tail -c +17 /var/lib/clickhouse/data/default/{}/{}/data1.bin | od -x -N 1 | head -n 1 | awk '{{print $2}}'".format( table_name, part_name) return node.exec_in_container(["bash", "-c", cmd]).strip() def get_second_multiple_codec_byte(node, table_name, part_name): cmd = "tail -c +17 /var/lib/clickhouse/data/default/{}/{}/data1.bin | od -x -j 11 -N 1 | head -n 1 | awk '{{print $2}}'".format( table_name, part_name) return node.exec_in_container(["bash", "-c", cmd]).strip() def get_random_string(length): return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length)) CODECS_MAPPING = { 'LZ4': '0082', 'LZ4HC': '0082', # not an error, same byte 'ZSTD': '0090', 'Multiple': '0091', } def test_default_codec_single(start_cluster): for i, node in enumerate([node1, node2]): node.query(""" CREATE TABLE compression_table ( key UInt64, data1 String CODEC(Default) ) ENGINE = ReplicatedMergeTree('/t', '{}') ORDER BY tuple() PARTITION BY key; """.format(i)) # ZSTD(10) and ZSTD(10) after merge node1.query("INSERT INTO compression_table VALUES (1, 'x')") # ZSTD(10) and LZ4HC(10) after merge node1.query("INSERT INTO compression_table VALUES (2, '{}')".format(get_random_string(2048))) # ZSTD(10) and LZ4 after merge node1.query("INSERT INTO compression_table VALUES (3, '{}')".format(get_random_string(22048))) node2.query("SYSTEM SYNC REPLICA compression_table", timeout=15) # to reload parts node1.query("DETACH TABLE compression_table") node2.query("DETACH TABLE compression_table") node1.query("ATTACH TABLE compression_table") node2.query("ATTACH TABLE compression_table") node1.query("SYSTEM FLUSH LOGS") node2.query("SYSTEM FLUSH LOGS") # Same codec for all assert get_compression_codec_byte(node1, "compression_table", "1_0_0_0") == CODECS_MAPPING['ZSTD'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_0_0_0'") == "ZSTD(10)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_0_0_0'") == "ZSTD(10)\n" assert get_compression_codec_byte(node1, "compression_table", "2_0_0_0") == CODECS_MAPPING['ZSTD'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_0_0_0'") == "ZSTD(10)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_0_0_0'") == "ZSTD(10)\n" assert get_compression_codec_byte(node1, "compression_table", "3_0_0_0") == CODECS_MAPPING['ZSTD'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_0_0_0'") == "ZSTD(10)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_0_0_0'") == "ZSTD(10)\n" # just to be sure that replication works node1.query("OPTIMIZE TABLE compression_table FINAL") node2.query("SYSTEM SYNC REPLICA compression_table", timeout=15) # to reload parts node1.query("DETACH TABLE compression_table") node2.query("DETACH TABLE compression_table") node1.query("ATTACH TABLE compression_table") node2.query("ATTACH TABLE compression_table") node1.query("SYSTEM FLUSH LOGS") node2.query("SYSTEM FLUSH LOGS") assert get_compression_codec_byte(node1, "compression_table", "1_0_0_1") == CODECS_MAPPING['ZSTD'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_0_0_1'") == "ZSTD(10)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_0_0_1'") == "ZSTD(10)\n" assert get_compression_codec_byte(node1, "compression_table", "2_0_0_1") == CODECS_MAPPING['LZ4HC'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_0_0_1'") == "LZ4HC(5)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_0_0_1'") == "LZ4HC(5)\n" assert get_compression_codec_byte(node1, "compression_table", "3_0_0_1") == CODECS_MAPPING['LZ4'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_0_0_1'") == "LZ4\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_0_0_1'") == "LZ4\n" assert node1.query("SELECT COUNT() FROM compression_table") == "3\n" assert node2.query("SELECT COUNT() FROM compression_table") == "3\n" node1.query("DROP TABLE compression_table SYNC") node2.query("DROP TABLE compression_table SYNC") def test_default_codec_multiple(start_cluster): for i, node in enumerate([node1, node2]): node.query(""" CREATE TABLE compression_table_multiple ( key UInt64, data1 String CODEC(NONE, Default) ) ENGINE = ReplicatedMergeTree('/d', '{}') ORDER BY tuple() PARTITION BY key; """.format(i), settings={"allow_suspicious_codecs": 1}) # ZSTD(10) and ZSTD(10) after merge node1.query("INSERT INTO compression_table_multiple VALUES (1, 'x')") # ZSTD(10) and LZ4HC(10) after merge node1.query("INSERT INTO compression_table_multiple VALUES (2, '{}')".format(get_random_string(2048))) # ZSTD(10) and LZ4 after merge node1.query("INSERT INTO compression_table_multiple VALUES (3, '{}')".format(get_random_string(22048))) # Same codec for all assert get_compression_codec_byte(node1, "compression_table_multiple", "1_0_0_0") == CODECS_MAPPING['Multiple'] assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "1_0_0_0") == CODECS_MAPPING['ZSTD'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '1_0_0_0'") == "ZSTD(10)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '1_0_0_0'") == "ZSTD(10)\n" assert get_compression_codec_byte(node1, "compression_table_multiple", "2_0_0_0") == CODECS_MAPPING['Multiple'] assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "2_0_0_0") == CODECS_MAPPING['ZSTD'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '2_0_0_0'") == "ZSTD(10)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '2_0_0_0'") == "ZSTD(10)\n" assert get_compression_codec_byte(node1, "compression_table_multiple", "3_0_0_0") == CODECS_MAPPING['Multiple'] assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "3_0_0_0") == CODECS_MAPPING['ZSTD'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '3_0_0_0'") == "ZSTD(10)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '3_0_0_0'") == "ZSTD(10)\n" node2.query("SYSTEM SYNC REPLICA compression_table_multiple", timeout=15) node1.query("OPTIMIZE TABLE compression_table_multiple FINAL") assert get_compression_codec_byte(node1, "compression_table_multiple", "1_0_0_1") == CODECS_MAPPING['Multiple'] assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "1_0_0_1") == CODECS_MAPPING['ZSTD'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '1_0_0_1'") == "ZSTD(10)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '1_0_0_1'") == "ZSTD(10)\n" assert get_compression_codec_byte(node1, "compression_table_multiple", "2_0_0_1") == CODECS_MAPPING['Multiple'] assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "2_0_0_1") == CODECS_MAPPING['LZ4HC'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '2_0_0_1'") == "LZ4HC(5)\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '2_0_0_1'") == "LZ4HC(5)\n" assert get_compression_codec_byte(node1, "compression_table_multiple", "3_0_0_1") == CODECS_MAPPING['Multiple'] assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "3_0_0_1") == CODECS_MAPPING['LZ4'] assert node1.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '3_0_0_1'") == "LZ4\n" assert node2.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '3_0_0_1'") == "LZ4\n" assert node1.query("SELECT COUNT() FROM compression_table_multiple") == "3\n" assert node2.query("SELECT COUNT() FROM compression_table_multiple") == "3\n" node1.query("DROP TABLE compression_table_multiple SYNC") node2.query("DROP TABLE compression_table_multiple SYNC") def test_default_codec_version_update(start_cluster): node3.query(""" CREATE TABLE compression_table ( key UInt64 CODEC(LZ4HC(7)), data1 String ) ENGINE = MergeTree ORDER BY tuple() PARTITION BY key; """) node3.query("INSERT INTO compression_table VALUES (1, 'x')") node3.query("INSERT INTO compression_table VALUES (2, '{}')".format(get_random_string(2048))) node3.query("INSERT INTO compression_table VALUES (3, '{}')".format(get_random_string(22048))) old_version = node3.query("SELECT version()") node3.restart_with_latest_version() new_version = node3.query("SELECT version()") logging.debug(f"Updated from {old_version} to {new_version}") assert node3.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_1_1_0'") == "ZSTD(1)\n" assert node3.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_0'") == "ZSTD(1)\n" assert node3.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_0'") == "ZSTD(1)\n" node3.query("OPTIMIZE TABLE compression_table FINAL") assert node3.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_1_1_1'") == "ZSTD(10)\n" assert node3.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_1'") == "LZ4HC(5)\n" assert node3.query( "SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_1'") == "LZ4\n" node3.query("DROP TABLE compression_table SYNC") def callback(n): n.exec_in_container(['bash', '-c', 'rm -rf /var/lib/clickhouse/metadata/system /var/lib/clickhouse/data/system '], user='root') node3.restart_with_original_version(callback_onstop=callback) cur_version = node3.query("SELECT version()") logging.debug(f"End with {cur_version}") def test_default_codec_for_compact_parts(start_cluster): node4.query(""" CREATE TABLE compact_parts_table ( key UInt64, data String ) ENGINE MergeTree ORDER BY tuple() """) node4.query("INSERT INTO compact_parts_table VALUES (1, 'Hello world')") assert node4.query("SELECT COUNT() FROM compact_parts_table") == "1\n" node4.query("ALTER TABLE compact_parts_table DETACH PART 'all_1_1_0'") node4.exec_in_container(["bash", "-c", "rm /var/lib/clickhouse/data/default/compact_parts_table/detached/all_1_1_0/default_compression_codec.txt"]) node4.query("ALTER TABLE compact_parts_table ATTACH PART 'all_1_1_0'") assert node4.query("SELECT COUNT() FROM compact_parts_table") == "1\n" node4.query("DETACH TABLE compact_parts_table") node4.query("ATTACH TABLE compact_parts_table") assert node4.query("SELECT COUNT() FROM compact_parts_table") == "1\n" node4.query("DROP TABLE compact_parts_table SYNC")