mirror of
synced 2024-11-17 21:24:28 +00:00
This PR formats all the `*.py` files found under the `tests/integration` folder. It also reorders the imports and cleans up a bunch of unused imports. The formatting also takes care of other things like wrapping lines and fixing spaces and indents such that the tests look more readable.
530 lines
23 KiB
530 lines
23 KiB
import os
import random
import string
import struct
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
def get_random_array():
return [random.randint(0, 1000) % 1000 for _ in range(random.randint(0, 1000))]
def get_random_string():
length = random.randint(0, 1000)
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def insert_random_data(table, node, size):
data = [
'(' + ','.join((
"'" + get_random_string() + "'",
str(get_random_array()))) +
')' for i in range(size)
node.query("INSERT INTO {} VALUES {}".format(table, ','.join(data)))
def create_tables(name, nodes, node_settings, shard):
for i, (node, settings) in enumerate(zip(nodes, node_settings)):
CREATE TABLE {name}(date Date, id UInt32, s String, arr Array(Int32))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/{name}', '{repl}')
SETTINGS index_granularity = 64, index_granularity_bytes = {index_granularity_bytes},
min_rows_for_wide_part = {min_rows_for_wide_part}, min_rows_for_compact_part = {min_rows_for_compact_part},
in_memory_parts_enable_wal = 1
'''.format(name=name, shard=shard, repl=i, **settings))
def create_tables_old_format(name, nodes, shard):
for i, node in enumerate(nodes):
CREATE TABLE {name}(date Date, id UInt32, s String, arr Array(Int32))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{shard}/{name}', '{repl}', date, id, 64)
'''.format(name=name, shard=shard, repl=i))
node1 = cluster.add_instance('node1', main_configs=[], user_configs=["configs/users.d/not_optimize_count.xml"],
node2 = cluster.add_instance('node2', main_configs=[], user_configs=["configs/users.d/not_optimize_count.xml"],
settings_default = {'index_granularity_bytes': 10485760, 'min_rows_for_wide_part': 512, 'min_rows_for_compact_part': 0}
settings_compact_only = {'index_granularity_bytes': 10485760, 'min_rows_for_wide_part': 1000000,
'min_rows_for_compact_part': 0}
settings_not_adaptive = {'index_granularity_bytes': 0, 'min_rows_for_wide_part': 512, 'min_rows_for_compact_part': 0}
node3 = cluster.add_instance('node3', main_configs=[], user_configs=["configs/users.d/not_optimize_count.xml"],
node4 = cluster.add_instance('node4', user_configs=["configs/users.d/not_optimize_count.xml"],
main_configs=['configs/no_leader.xml'], with_zookeeper=True)
settings_compact = {'index_granularity_bytes': 10485760, 'min_rows_for_wide_part': 512, 'min_rows_for_compact_part': 0}
settings_wide = {'index_granularity_bytes': 10485760, 'min_rows_for_wide_part': 0, 'min_rows_for_compact_part': 0}
node5 = cluster.add_instance('node5', main_configs=['configs/compact_parts.xml'], with_zookeeper=True)
node6 = cluster.add_instance('node6', main_configs=['configs/compact_parts.xml'], with_zookeeper=True)
settings_in_memory = {'index_granularity_bytes': 10485760, 'min_rows_for_wide_part': 512,
'min_rows_for_compact_part': 256}
node9 = cluster.add_instance('node9', with_zookeeper=True, stay_alive=True)
node10 = cluster.add_instance('node10', with_zookeeper=True)
node11 = cluster.add_instance('node11', main_configs=['configs/do_not_merge.xml'], with_zookeeper=True, stay_alive=True)
node12 = cluster.add_instance('node12', main_configs=['configs/do_not_merge.xml'], with_zookeeper=True, stay_alive=True)
def start_cluster():
create_tables('polymorphic_table', [node1, node2], [settings_default, settings_default], "shard1")
create_tables('compact_parts_only', [node1, node2], [settings_compact_only, settings_compact_only], "shard1")
create_tables('non_adaptive_table', [node1, node2], [settings_not_adaptive, settings_not_adaptive], "shard1")
create_tables('polymorphic_table_compact', [node3, node4], [settings_compact, settings_wide], "shard2")
create_tables('polymorphic_table_wide', [node3, node4], [settings_wide, settings_compact], "shard2")
create_tables_old_format('polymorphic_table', [node5, node6], "shard3")
create_tables('in_memory_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4")
create_tables('wal_table', [node11, node12], [settings_in_memory, settings_in_memory], "shard4")
create_tables('restore_table', [node11, node12], [settings_in_memory, settings_in_memory], "shard5")
create_tables('deduplication_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard5")
create_tables('sync_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard5")
create_tables('alters_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard5")
yield cluster
('first_node', 'second_node'),
(node1, node2), # compact parts
(node5, node6), # compact parts, old-format
def test_polymorphic_parts_basics(start_cluster, first_node, second_node):
first_node.query("SYSTEM STOP MERGES")
second_node.query("SYSTEM STOP MERGES")
for size in [300, 300, 600]:
insert_random_data('polymorphic_table', first_node, size)
second_node.query("SYSTEM SYNC REPLICA polymorphic_table", timeout=20)
assert first_node.query("SELECT count() FROM polymorphic_table") == "1200\n"
assert second_node.query("SELECT count() FROM polymorphic_table") == "1200\n"
expected = "Compact\t2\nWide\t1\n"
assert TSV(first_node.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'polymorphic_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
assert TSV(second_node.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'polymorphic_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
first_node.query("SYSTEM START MERGES")
second_node.query("SYSTEM START MERGES")
for _ in range(40):
insert_random_data('polymorphic_table', first_node, 10)
insert_random_data('polymorphic_table', second_node, 10)
first_node.query("SYSTEM SYNC REPLICA polymorphic_table", timeout=20)
second_node.query("SYSTEM SYNC REPLICA polymorphic_table", timeout=20)
assert first_node.query("SELECT count() FROM polymorphic_table") == "2000\n"
assert second_node.query("SELECT count() FROM polymorphic_table") == "2000\n"
first_node.query("OPTIMIZE TABLE polymorphic_table FINAL")
second_node.query("SYSTEM SYNC REPLICA polymorphic_table", timeout=20)
assert first_node.query("SELECT count() FROM polymorphic_table") == "2000\n"
assert second_node.query("SELECT count() FROM polymorphic_table") == "2000\n"
assert first_node.query(
"SELECT DISTINCT part_type FROM system.parts WHERE table = 'polymorphic_table' AND active") == "Wide\n"
assert second_node.query(
"SELECT DISTINCT part_type FROM system.parts WHERE table = 'polymorphic_table' AND active") == "Wide\n"
# Check alters and mutations also work
first_node.query("ALTER TABLE polymorphic_table ADD COLUMN ss String")
first_node.query("ALTER TABLE polymorphic_table UPDATE ss = toString(id) WHERE 1")
second_node.query("SYSTEM SYNC REPLICA polymorphic_table", timeout=20)
first_node.query("SELECT count(ss) FROM polymorphic_table") == "2000\n"
first_node.query("SELECT uniqExact(ss) FROM polymorphic_table") == "600\n"
second_node.query("SELECT count(ss) FROM polymorphic_table") == "2000\n"
second_node.query("SELECT uniqExact(ss) FROM polymorphic_table") == "600\n"
# Checks mostly that merge from compact part to compact part works.
def test_compact_parts_only(start_cluster):
for i in range(20):
insert_random_data('compact_parts_only', node1, 100)
insert_random_data('compact_parts_only', node2, 100)
node1.query("SYSTEM SYNC REPLICA compact_parts_only", timeout=20)
node2.query("SYSTEM SYNC REPLICA compact_parts_only", timeout=20)
assert node1.query("SELECT count() FROM compact_parts_only") == "4000\n"
assert node2.query("SELECT count() FROM compact_parts_only") == "4000\n"
assert node1.query(
"SELECT DISTINCT part_type FROM system.parts WHERE table = 'compact_parts_only' AND active") == "Compact\n"
assert node2.query(
"SELECT DISTINCT part_type FROM system.parts WHERE table = 'compact_parts_only' AND active") == "Compact\n"
node1.query("OPTIMIZE TABLE compact_parts_only FINAL")
node2.query("SYSTEM SYNC REPLICA compact_parts_only", timeout=20)
assert node2.query("SELECT count() FROM compact_parts_only") == "4000\n"
expected = "Compact\t1\n"
assert TSV(node1.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'compact_parts_only' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
assert TSV(node2.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'compact_parts_only' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
# Check that follower replicas create parts of the same type, which leader has chosen at merge.
('table', 'part_type'),
('polymorphic_table_compact', 'Compact'),
('polymorphic_table_wide', 'Wide')
def test_different_part_types_on_replicas(start_cluster, table, part_type):
leader = node3
follower = node4
assert leader.query("SELECT is_leader FROM system.replicas WHERE table = '{}'".format(table)) == "1\n"
assert node4.query("SELECT is_leader FROM system.replicas WHERE table = '{}'".format(table)) == "0\n"
for _ in range(3):
insert_random_data(table, leader, 100)
leader.query("OPTIMIZE TABLE {} FINAL".format(table))
follower.query("SYSTEM SYNC REPLICA {}".format(table), timeout=20)
expected = "{}\t1\n".format(part_type)
assert TSV(leader.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = '{}' AND active GROUP BY part_type ORDER BY part_type".format(
table))) == TSV(expected)
assert TSV(follower.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = '{}' AND active GROUP BY part_type ORDER BY part_type".format(
table))) == TSV(expected)
node7 = cluster.add_instance('node7', user_configs=["configs_old/users.d/not_optimize_count.xml"], with_zookeeper=True,
image='yandex/clickhouse-server', tag='', stay_alive=True,
node8 = cluster.add_instance('node8', user_configs=["configs/users.d/not_optimize_count.xml"], with_zookeeper=True)
settings7 = {'index_granularity_bytes': 10485760}
settings8 = {'index_granularity_bytes': 10485760, 'min_rows_for_wide_part': 512, 'min_rows_for_compact_part': 0}
def start_cluster_diff_versions():
for name in ['polymorphic_table', 'polymorphic_table_2']:
CREATE TABLE {name}(date Date, id UInt32, s String, arr Array(Int32))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/shard5/{name}', '1')
SETTINGS index_granularity = 64, index_granularity_bytes = {index_granularity_bytes}
'''.format(name=name, **settings7)
CREATE TABLE {name}(date Date, id UInt32, s String, arr Array(Int32))
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/shard5/{name}', '2')
SETTINGS index_granularity = 64, index_granularity_bytes = {index_granularity_bytes},
min_rows_for_wide_part = {min_rows_for_wide_part}, min_bytes_for_wide_part = {min_bytes_for_wide_part}
'''.format(name=name, **settings8)
yield cluster
@pytest.mark.skip(reason="compatability is temporary broken")
def test_polymorphic_parts_diff_versions(start_cluster_diff_versions):
# Check that replication with Wide parts works between different versions.
node_old = node7
node_new = node8
insert_random_data('polymorphic_table', node7, 100)
node8.query("SYSTEM SYNC REPLICA polymorphic_table", timeout=20)
assert node8.query("SELECT count() FROM polymorphic_table") == "100\n"
assert node8.query(
"SELECT DISTINCT part_type FROM system.parts WHERE table = 'polymorphic_table' and active") == "Wide\n"
@pytest.mark.skip(reason="compatability is temporary broken")
def test_polymorphic_parts_diff_versions_2(start_cluster_diff_versions):
# Replication doesn't work on old version if part is created in compact format, because
# this version doesn't know anything about it. It's considered to be ok.
node_old = node7
node_new = node8
insert_random_data('polymorphic_table_2', node_new, 100)
assert node_new.query("SELECT count() FROM polymorphic_table_2") == "100\n"
assert node_old.query("SELECT count() FROM polymorphic_table_2") == "0\n"
with pytest.raises(Exception):
node_old.query("SYSTEM SYNC REPLICA polymorphic_table_2", timeout=3)
node_old.query("SYSTEM SYNC REPLICA polymorphic_table_2", timeout=20)
# Works after update
assert node_old.query("SELECT count() FROM polymorphic_table_2") == "100\n"
assert node_old.query(
"SELECT DISTINCT part_type FROM system.parts WHERE table = 'polymorphic_table_2' and active") == "Compact\n"
def test_polymorphic_parts_non_adaptive(start_cluster):
node1.query("SYSTEM STOP MERGES")
node2.query("SYSTEM STOP MERGES")
insert_random_data('non_adaptive_table', node1, 100)
node2.query("SYSTEM SYNC REPLICA non_adaptive_table", timeout=20)
insert_random_data('non_adaptive_table', node2, 100)
node1.query("SYSTEM SYNC REPLICA non_adaptive_table", timeout=20)
assert TSV(node1.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'non_adaptive_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
assert TSV(node2.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'non_adaptive_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
assert node1.contains_in_log(
"<Warning> default.non_adaptive_table: Table can't create parts with adaptive granularity")
def test_in_memory(start_cluster):
node9.query("SYSTEM STOP MERGES")
node10.query("SYSTEM STOP MERGES")
for size in [200, 200, 300, 600]:
insert_random_data('in_memory_table', node9, size)
node10.query("SYSTEM SYNC REPLICA in_memory_table", timeout=20)
assert node9.query("SELECT count() FROM in_memory_table") == "1300\n"
assert node10.query("SELECT count() FROM in_memory_table") == "1300\n"
expected = "Compact\t1\nInMemory\t2\nWide\t1\n"
assert TSV(node9.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
assert TSV(node10.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
node9.query("SYSTEM START MERGES")
node10.query("SYSTEM START MERGES")
assert_eq_with_retry(node9, "OPTIMIZE TABLE in_memory_table FINAL SETTINGS optimize_throw_if_noop = 1", "")
node10.query("SYSTEM SYNC REPLICA in_memory_table", timeout=20)
assert node9.query("SELECT count() FROM in_memory_table") == "1300\n"
assert node10.query("SELECT count() FROM in_memory_table") == "1300\n"
assert TSV(node9.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
assert TSV(node10.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(
def test_in_memory_wal(start_cluster):
# Merges are disabled in config
for i in range(5):
insert_random_data('wal_table', node11, 50)
node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
def check(node, rows, parts):
node.query("SELECT count() FROM wal_table") == "{}\n".format(rows)
"SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == "{}\n".format(
check(node11, 250, 5)
check(node12, 250, 5)
# WAL works at inserts
check(node11, 250, 5)
# WAL works at fetches
check(node12, 250, 5)
insert_random_data('wal_table', node11, 50)
node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
# Disable replication
with PartitionManager() as pm:
pm.partition_instances(node11, node12)
check(node11, 300, 6)
wal_file = os.path.join(node11.path, "database/data/default/wal_table/wal.bin")
# Corrupt wal file
open(wal_file, 'rw+').truncate(os.path.getsize(wal_file) - 10)
# Broken part is lost, but other restored successfully
check(node11, 250, 5)
# WAL with blocks from 0 to 4
broken_wal_file = os.path.join(node11.path, "database/data/default/wal_table/wal_0_4.bin")
assert os.path.exists(broken_wal_file)
# Fetch lost part from replica
node11.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
check(node11, 300, 6)
# Check that new data is written to new wal, but old is still exists for restoring
assert os.path.getsize(wal_file) > 0
assert os.path.exists(broken_wal_file)
# Data is lost without WAL
node11.query("ALTER TABLE wal_table MODIFY SETTING in_memory_parts_enable_wal = 0")
with PartitionManager() as pm:
pm.partition_instances(node11, node12)
insert_random_data('wal_table', node11, 50)
check(node11, 350, 7)
check(node11, 300, 6)
def test_in_memory_wal_rotate(start_cluster):
# Write every part to single wal
node11.query("ALTER TABLE restore_table MODIFY SETTING write_ahead_log_max_bytes = 10")
for i in range(5):
insert_random_data('restore_table', node11, 50)
for i in range(5):
wal_file = os.path.join(node11.path, "database/data/default/restore_table/wal_{0}_{0}.bin".format(i))
assert os.path.exists(wal_file)
for node in [node11, node12]:
"ALTER TABLE restore_table MODIFY SETTING number_of_free_entries_in_pool_to_lower_max_size_of_merge = 0")
node.query("ALTER TABLE restore_table MODIFY SETTING max_bytes_to_merge_at_max_space_in_pool = 10000000")
assert_eq_with_retry(node11, "OPTIMIZE TABLE restore_table FINAL SETTINGS optimize_throw_if_noop = 1", "")
# Restart to be sure, that clearing stale logs task was ran
for i in range(5):
wal_file = os.path.join(node11.path, "database/data/default/restore_table/wal_{0}_{0}.bin".format(i))
assert not os.path.exists(wal_file)
# New wal file was created and ready to write part to it
wal_file = os.path.join(node11.path, "database/data/default/restore_table/wal.bin")
assert os.path.exists(wal_file)
assert os.path.getsize(wal_file) == 0
def test_in_memory_deduplication(start_cluster):
for i in range(3):
node9.query("INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')")
node10.query("INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')")
node9.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20)
node10.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20)
assert node9.query("SELECT date, id, s FROM deduplication_table") == "2020-03-03\t1\tfoo\n"
assert node10.query("SELECT date, id, s FROM deduplication_table") == "2020-03-03\t1\tfoo\n"
# Checks that restoring from WAL works after table schema changed
def test_in_memory_alters(start_cluster):
def check_parts_type(parts_num):
assert node9.query("SELECT part_type, count() FROM system.parts WHERE table = 'alters_table' \
AND active GROUP BY part_type") == "InMemory\t{}\n".format(parts_num)
"INSERT INTO alters_table (date, id, s) VALUES (toDate('2020-10-10'), 1, 'ab'), (toDate('2020-10-10'), 2, 'cd')")
node9.query("ALTER TABLE alters_table ADD COLUMN col1 UInt32")
expected = "1\tab\t0\n2\tcd\t0\n"
assert node9.query("SELECT id, s, col1 FROM alters_table") == expected
node9.query("INSERT INTO alters_table (date, id, col1) VALUES (toDate('2020-10-10'), 3, 100)")
node9.query("ALTER TABLE alters_table MODIFY COLUMN col1 String")
node9.query("ALTER TABLE alters_table DROP COLUMN s")
with pytest.raises(Exception):
node9.query("SELECT id, s, col1 FROM alters_table")
expected = expected = "1\t0_foo\n2\t0_foo\n3\t100_foo\n"
assert node9.query("SELECT id, col1 || '_foo' FROM alters_table")
def test_polymorphic_parts_index(start_cluster):
CREATE TABLE index_compact(a UInt32, s String)
SETTINGS min_rows_for_wide_part = 1000, index_granularity = 128, merge_max_block_size = 100''')
node1.query("INSERT INTO index_compact SELECT number, toString(number) FROM numbers(100)")
node1.query("INSERT INTO index_compact SELECT number, toString(number) FROM numbers(30)")
node1.query("OPTIMIZE TABLE index_compact FINAL")
assert node1.query("SELECT part_type FROM system.parts WHERE table = 'index_compact' AND active") == "Compact\n"
assert node1.query("SELECT marks FROM system.parts WHERE table = 'index_compact' AND active") == "2\n"
index_path = os.path.join(node1.path, "database/data/default/index_compact/all_1_2_1/primary.idx")
f = open(index_path, 'rb')
assert os.path.getsize(index_path) == 8
assert struct.unpack('I', f.read(4))[0] == 0
assert struct.unpack('I', f.read(4))[0] == 99