mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-16 03:12:43 +00:00
522 lines
16 KiB
Python
522 lines
16 KiB
Python
import random
|
|
import string
|
|
import logging
|
|
import pytest
|
|
import time
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
node1 = cluster.add_instance(
|
|
"node1",
|
|
main_configs=["configs/default_compression.xml", "configs/wide_parts_only.xml"],
|
|
with_zookeeper=True,
|
|
)
|
|
node2 = cluster.add_instance(
|
|
"node2",
|
|
main_configs=["configs/default_compression.xml", "configs/wide_parts_only.xml"],
|
|
with_zookeeper=True,
|
|
)
|
|
node3 = cluster.add_instance(
|
|
"node3",
|
|
main_configs=["configs/default_compression.xml", "configs/wide_parts_only.xml"],
|
|
image="yandex/clickhouse-server",
|
|
tag="20.3.16",
|
|
stay_alive=True,
|
|
with_installed_binary=True,
|
|
)
|
|
node4 = cluster.add_instance("node4")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def get_compression_codec_byte(node, table_name, part_name):
|
|
cmd = "tail -c +17 /var/lib/clickhouse/data/default/{}/{}/data1.bin | od -x -N 1 | head -n 1 | awk '{{print $2}}'".format(
|
|
table_name, part_name
|
|
)
|
|
return node.exec_in_container(["bash", "-c", cmd]).strip()
|
|
|
|
|
|
def get_second_multiple_codec_byte(node, table_name, part_name):
|
|
cmd = "tail -c +17 /var/lib/clickhouse/data/default/{}/{}/data1.bin | od -x -j 11 -N 1 | head -n 1 | awk '{{print $2}}'".format(
|
|
table_name, part_name
|
|
)
|
|
return node.exec_in_container(["bash", "-c", cmd]).strip()
|
|
|
|
|
|
def get_random_string(length):
|
|
return "".join(
|
|
random.choice(string.ascii_uppercase + string.digits) for _ in range(length)
|
|
)
|
|
|
|
|
|
CODECS_MAPPING = {
|
|
"LZ4": "0082",
|
|
"LZ4HC": "0082", # not an error, same byte
|
|
"ZSTD": "0090",
|
|
"Multiple": "0091",
|
|
}
|
|
|
|
|
|
def test_default_codec_single(start_cluster):
|
|
for i, node in enumerate([node1, node2]):
|
|
node.query(
|
|
"""
|
|
CREATE TABLE compression_table (
|
|
key UInt64,
|
|
data1 String CODEC(Default)
|
|
) ENGINE = ReplicatedMergeTree('/t', '{}') ORDER BY tuple() PARTITION BY key;
|
|
""".format(
|
|
i
|
|
)
|
|
)
|
|
|
|
# ZSTD(10) and ZSTD(10) after merge
|
|
node1.query("INSERT INTO compression_table VALUES (1, 'x')")
|
|
|
|
# ZSTD(10) and LZ4HC(10) after merge
|
|
node1.query(
|
|
"INSERT INTO compression_table VALUES (2, '{}')".format(get_random_string(2048))
|
|
)
|
|
|
|
# ZSTD(10) and LZ4 after merge
|
|
node1.query(
|
|
"INSERT INTO compression_table VALUES (3, '{}')".format(
|
|
get_random_string(22048)
|
|
)
|
|
)
|
|
|
|
node2.query("SYSTEM SYNC REPLICA compression_table", timeout=15)
|
|
|
|
# to reload parts
|
|
node1.query("DETACH TABLE compression_table")
|
|
node2.query("DETACH TABLE compression_table")
|
|
|
|
node1.query("ATTACH TABLE compression_table")
|
|
node2.query("ATTACH TABLE compression_table")
|
|
|
|
node1.query("SYSTEM FLUSH LOGS")
|
|
node2.query("SYSTEM FLUSH LOGS")
|
|
|
|
# Same codec for all
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table", "1_0_0_0")
|
|
== CODECS_MAPPING["ZSTD"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table", "2_0_0_0")
|
|
== CODECS_MAPPING["ZSTD"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table", "3_0_0_0")
|
|
== CODECS_MAPPING["ZSTD"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
|
|
# just to be sure that replication works
|
|
node1.query("OPTIMIZE TABLE compression_table FINAL")
|
|
|
|
node2.query("SYSTEM SYNC REPLICA compression_table", timeout=15)
|
|
|
|
# to reload parts
|
|
node1.query("DETACH TABLE compression_table")
|
|
node2.query("DETACH TABLE compression_table")
|
|
|
|
node1.query("ATTACH TABLE compression_table")
|
|
node2.query("ATTACH TABLE compression_table")
|
|
|
|
node1.query("SYSTEM FLUSH LOGS")
|
|
node2.query("SYSTEM FLUSH LOGS")
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table", "1_0_0_1")
|
|
== CODECS_MAPPING["ZSTD"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_0_0_1'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_0_0_1'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table", "2_0_0_1")
|
|
== CODECS_MAPPING["LZ4HC"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_0_0_1'"
|
|
)
|
|
== "LZ4HC(5)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_0_0_1'"
|
|
)
|
|
== "LZ4HC(5)\n"
|
|
)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table", "3_0_0_1")
|
|
== CODECS_MAPPING["LZ4"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_0_0_1'"
|
|
)
|
|
== "LZ4\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_0_0_1'"
|
|
)
|
|
== "LZ4\n"
|
|
)
|
|
|
|
assert node1.query("SELECT COUNT() FROM compression_table") == "3\n"
|
|
assert node2.query("SELECT COUNT() FROM compression_table") == "3\n"
|
|
|
|
node1.query("DROP TABLE compression_table SYNC")
|
|
node2.query("DROP TABLE compression_table SYNC")
|
|
|
|
|
|
def test_default_codec_multiple(start_cluster):
|
|
for i, node in enumerate([node1, node2]):
|
|
node.query(
|
|
"""
|
|
CREATE TABLE compression_table_multiple (
|
|
key UInt64,
|
|
data1 String CODEC(NONE, Default)
|
|
) ENGINE = ReplicatedMergeTree('/d', '{}') ORDER BY tuple() PARTITION BY key;
|
|
""".format(
|
|
i
|
|
),
|
|
settings={"allow_suspicious_codecs": 1},
|
|
)
|
|
|
|
# ZSTD(10) and ZSTD(10) after merge
|
|
node1.query("INSERT INTO compression_table_multiple VALUES (1, 'x')")
|
|
|
|
# ZSTD(10) and LZ4HC(10) after merge
|
|
node1.query(
|
|
"INSERT INTO compression_table_multiple VALUES (2, '{}')".format(
|
|
get_random_string(2048)
|
|
)
|
|
)
|
|
|
|
# ZSTD(10) and LZ4 after merge
|
|
node1.query(
|
|
"INSERT INTO compression_table_multiple VALUES (3, '{}')".format(
|
|
get_random_string(22048)
|
|
)
|
|
)
|
|
|
|
node2.query("SYSTEM SYNC REPLICA compression_table_multiple", timeout=15)
|
|
|
|
# Same codec for all
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table_multiple", "1_0_0_0")
|
|
== CODECS_MAPPING["Multiple"]
|
|
)
|
|
assert (
|
|
get_second_multiple_codec_byte(node1, "compression_table_multiple", "1_0_0_0")
|
|
== CODECS_MAPPING["ZSTD"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '1_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '1_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table_multiple", "2_0_0_0")
|
|
== CODECS_MAPPING["Multiple"]
|
|
)
|
|
assert (
|
|
get_second_multiple_codec_byte(node1, "compression_table_multiple", "2_0_0_0")
|
|
== CODECS_MAPPING["ZSTD"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '2_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '2_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table_multiple", "3_0_0_0")
|
|
== CODECS_MAPPING["Multiple"]
|
|
)
|
|
assert (
|
|
get_second_multiple_codec_byte(node1, "compression_table_multiple", "3_0_0_0")
|
|
== CODECS_MAPPING["ZSTD"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '3_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '3_0_0_0'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
|
|
node2.query("SYSTEM SYNC REPLICA compression_table_multiple", timeout=15)
|
|
|
|
node1.query("OPTIMIZE TABLE compression_table_multiple FINAL")
|
|
|
|
node2.query("SYSTEM SYNC REPLICA compression_table_multiple", timeout=15)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table_multiple", "1_0_0_1")
|
|
== CODECS_MAPPING["Multiple"]
|
|
)
|
|
assert (
|
|
get_second_multiple_codec_byte(node1, "compression_table_multiple", "1_0_0_1")
|
|
== CODECS_MAPPING["ZSTD"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '1_0_0_1'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '1_0_0_1'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table_multiple", "2_0_0_1")
|
|
== CODECS_MAPPING["Multiple"]
|
|
)
|
|
assert (
|
|
get_second_multiple_codec_byte(node1, "compression_table_multiple", "2_0_0_1")
|
|
== CODECS_MAPPING["LZ4HC"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '2_0_0_1'"
|
|
)
|
|
== "LZ4HC(5)\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '2_0_0_1'"
|
|
)
|
|
== "LZ4HC(5)\n"
|
|
)
|
|
|
|
assert (
|
|
get_compression_codec_byte(node1, "compression_table_multiple", "3_0_0_1")
|
|
== CODECS_MAPPING["Multiple"]
|
|
)
|
|
assert (
|
|
get_second_multiple_codec_byte(node1, "compression_table_multiple", "3_0_0_1")
|
|
== CODECS_MAPPING["LZ4"]
|
|
)
|
|
assert (
|
|
node1.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '3_0_0_1'"
|
|
)
|
|
== "LZ4\n"
|
|
)
|
|
assert (
|
|
node2.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table_multiple' and name = '3_0_0_1'"
|
|
)
|
|
== "LZ4\n"
|
|
)
|
|
|
|
assert node1.query("SELECT COUNT() FROM compression_table_multiple") == "3\n"
|
|
assert node2.query("SELECT COUNT() FROM compression_table_multiple") == "3\n"
|
|
|
|
node1.query("DROP TABLE compression_table_multiple SYNC")
|
|
node2.query("DROP TABLE compression_table_multiple SYNC")
|
|
|
|
|
|
def test_default_codec_version_update(start_cluster):
|
|
node3.query(
|
|
"""
|
|
CREATE TABLE compression_table (
|
|
key UInt64 CODEC(LZ4HC(7)),
|
|
data1 String
|
|
) ENGINE = MergeTree ORDER BY tuple() PARTITION BY key;
|
|
"""
|
|
)
|
|
|
|
node3.query("INSERT INTO compression_table VALUES (1, 'x')")
|
|
node3.query(
|
|
"INSERT INTO compression_table VALUES (2, '{}')".format(get_random_string(2048))
|
|
)
|
|
node3.query(
|
|
"INSERT INTO compression_table VALUES (3, '{}')".format(
|
|
get_random_string(22048)
|
|
)
|
|
)
|
|
|
|
old_version = node3.query("SELECT version()")
|
|
node3.restart_with_latest_version(fix_metadata=True)
|
|
new_version = node3.query("SELECT version()")
|
|
logging.debug(f"Updated from {old_version} to {new_version}")
|
|
assert (
|
|
node3.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_1_1_0'"
|
|
)
|
|
== "ZSTD(1)\n"
|
|
)
|
|
assert (
|
|
node3.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_0'"
|
|
)
|
|
== "ZSTD(1)\n"
|
|
)
|
|
assert (
|
|
node3.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_0'"
|
|
)
|
|
== "ZSTD(1)\n"
|
|
)
|
|
|
|
node3.query("OPTIMIZE TABLE compression_table FINAL")
|
|
|
|
assert (
|
|
node3.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '1_1_1_1'"
|
|
)
|
|
== "ZSTD(10)\n"
|
|
)
|
|
assert (
|
|
node3.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_1'"
|
|
)
|
|
== "LZ4HC(5)\n"
|
|
)
|
|
assert (
|
|
node3.query(
|
|
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_1'"
|
|
)
|
|
== "LZ4\n"
|
|
)
|
|
|
|
node3.query("DROP TABLE compression_table SYNC")
|
|
|
|
def callback(n):
|
|
n.exec_in_container(
|
|
[
|
|
"bash",
|
|
"-c",
|
|
"rm -rf /var/lib/clickhouse/metadata/system /var/lib/clickhouse/data/system ",
|
|
],
|
|
user="root",
|
|
)
|
|
|
|
node3.restart_with_original_version(callback_onstop=callback)
|
|
|
|
cur_version = node3.query("SELECT version()")
|
|
logging.debug(f"End with {cur_version}")
|
|
|
|
|
|
def test_default_codec_for_compact_parts(start_cluster):
|
|
node4.query(
|
|
"""
|
|
CREATE TABLE compact_parts_table (
|
|
key UInt64,
|
|
data String
|
|
)
|
|
ENGINE MergeTree ORDER BY tuple()
|
|
"""
|
|
)
|
|
|
|
node4.query("INSERT INTO compact_parts_table VALUES (1, 'Hello world')")
|
|
assert node4.query("SELECT COUNT() FROM compact_parts_table") == "1\n"
|
|
|
|
node4.query("ALTER TABLE compact_parts_table DETACH PART 'all_1_1_0'")
|
|
|
|
node4.exec_in_container(
|
|
[
|
|
"bash",
|
|
"-c",
|
|
"rm /var/lib/clickhouse/data/default/compact_parts_table/detached/all_1_1_0/default_compression_codec.txt",
|
|
]
|
|
)
|
|
|
|
node4.query("ALTER TABLE compact_parts_table ATTACH PART 'all_1_1_0'")
|
|
|
|
assert node4.query("SELECT COUNT() FROM compact_parts_table") == "1\n"
|
|
|
|
node4.query("DETACH TABLE compact_parts_table")
|
|
node4.query("ATTACH TABLE compact_parts_table")
|
|
|
|
assert node4.query("SELECT COUNT() FROM compact_parts_table") == "1\n"
|
|
node4.query("DROP TABLE compact_parts_table SYNC")
|