ClickHouse/tests/integration/test_multiple_disks/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

2223 lines
71 KiB
Python
Raw Normal View History

2019-10-23 11:25:51 +00:00
import json
2019-08-20 18:00:48 +00:00
import random
2019-10-23 11:25:51 +00:00
import re
2019-08-20 18:00:48 +00:00
import string
import threading
2019-10-23 11:25:51 +00:00
import time
from multiprocessing.dummy import Pool
import pytest
from helpers.client import QueryRuntimeException
2019-07-15 09:36:02 +00:00
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=[
"configs/logs_config.xml",
"configs/config.d/storage_configuration.xml",
"configs/config.d/cluster.xml",
],
with_zookeeper=True,
stay_alive=True,
tmpfs=["/jbod1:size=40M", "/jbod2:size=40M", "/external:size=200M"],
macros={"shard": 0, "replica": 1},
)
node2 = cluster.add_instance(
"node2",
main_configs=[
"configs/logs_config.xml",
"configs/config.d/storage_configuration.xml",
"configs/config.d/cluster.xml",
],
with_zookeeper=True,
stay_alive=True,
tmpfs=["/jbod1:size=40M", "/jbod2:size=40M", "/external:size=200M"],
macros={"shard": 0, "replica": 2},
)
2019-07-15 09:36:02 +00:00
@pytest.fixture(scope="module")
2019-08-14 10:30:36 +00:00
def start_cluster():
2019-07-15 09:36:02 +00:00
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
2019-09-09 13:50:19 +00:00
def test_system_tables(start_cluster):
expected_disks_data = [
{
"name": "default",
2019-09-11 17:17:10 +00:00
"path": "/var/lib/clickhouse/",
"keep_free_space": "1024",
2019-09-09 13:50:19 +00:00
},
{
"name": "jbod1",
"path": "/jbod1/",
"keep_free_space": "0",
2019-09-09 13:50:19 +00:00
},
{
"name": "jbod2",
"path": "/jbod2/",
"keep_free_space": "10485760",
2019-09-09 13:50:19 +00:00
},
{
"name": "external",
"path": "/external/",
"keep_free_space": "0",
},
2019-09-09 13:50:19 +00:00
]
click_disk_data = json.loads(
node1.query("SELECT name, path, keep_free_space FROM system.disks FORMAT JSON")
)["data"]
assert sorted(click_disk_data, key=lambda x: x["name"]) == sorted(
expected_disks_data, key=lambda x: x["name"]
)
2019-09-09 13:50:19 +00:00
expected_policies_data = [
{
"policy_name": "small_jbod_with_external",
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2019-09-09 13:50:19 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-09-09 13:50:19 +00:00
},
{
"policy_name": "small_jbod_with_external",
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2019-09-09 13:50:19 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
},
{
"policy_name": "small_jbod_with_external_no_merges",
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
},
{
"policy_name": "small_jbod_with_external_no_merges",
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
"prefer_not_to_merge": 1,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-09-09 13:50:19 +00:00
},
2020-01-09 13:52:37 +00:00
{
"policy_name": "one_more_small_jbod_with_external",
"volume_name": "m",
"volume_priority": "1",
"disks": ["jbod1"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2020-01-09 13:52:37 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2020-01-09 13:52:37 +00:00
},
{
"policy_name": "one_more_small_jbod_with_external",
"volume_name": "e",
"volume_priority": "2",
"disks": ["external"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2020-01-09 13:52:37 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2020-01-09 13:52:37 +00:00
},
2019-09-09 13:50:19 +00:00
{
"policy_name": "jbods_with_external",
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1", "jbod2"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "10485760",
2019-09-09 13:50:19 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-09-09 13:50:19 +00:00
},
{
"policy_name": "jbods_with_external",
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2019-09-09 13:50:19 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-09-09 13:50:19 +00:00
},
{
"policy_name": "moving_jbod_with_external",
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2019-09-09 13:50:19 +00:00
"move_factor": 0.7,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-09-09 13:50:19 +00:00
},
{
"policy_name": "moving_jbod_with_external",
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2019-09-09 13:50:19 +00:00
"move_factor": 0.7,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-09-09 13:50:19 +00:00
},
{
"policy_name": "default_disk_with_external",
"volume_name": "small",
"volume_priority": "1",
"disks": ["default"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "2097152",
2019-09-09 13:50:19 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-09-09 13:50:19 +00:00
},
{
"policy_name": "default_disk_with_external",
"volume_name": "big",
"volume_priority": "2",
"disks": ["external"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "20971520",
2019-09-09 13:50:19 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-09-09 13:50:19 +00:00
},
2019-10-24 05:58:06 +00:00
{
"policy_name": "special_warning_policy",
"volume_name": "special_warning_zero_volume",
"volume_priority": "1",
"disks": ["default"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2019-10-24 05:58:06 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-10-24 05:58:06 +00:00
},
{
"policy_name": "special_warning_policy",
"volume_name": "special_warning_default_volume",
"volume_priority": "2",
"disks": ["external"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "0",
2019-10-24 05:58:06 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-10-24 05:58:06 +00:00
},
{
"policy_name": "special_warning_policy",
"volume_name": "special_warning_small_volume",
"volume_priority": "3",
"disks": ["jbod1"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "1024",
2019-10-24 05:58:06 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-10-24 05:58:06 +00:00
},
{
"policy_name": "special_warning_policy",
"volume_name": "special_warning_big_volume",
"volume_priority": "4",
"disks": ["jbod2"],
2020-07-08 15:52:23 +00:00
"volume_type": "JBOD",
"max_data_part_size": "1024000000",
2019-10-24 05:58:06 +00:00
"move_factor": 0.1,
"prefer_not_to_merge": 0,
2023-04-07 17:44:43 +00:00
"perform_ttl_move_on_insert": 1,
"load_balancing": "ROUND_ROBIN",
2019-10-24 05:58:06 +00:00
},
2019-09-09 13:50:19 +00:00
]
clickhouse_policies_data = json.loads(
node1.query(
"SELECT * FROM system.storage_policies WHERE policy_name != 'default' FORMAT JSON"
)
)["data"]
2019-09-09 13:50:19 +00:00
def key(x):
return (x["policy_name"], x["volume_name"], x["volume_priority"])
assert sorted(clickhouse_policies_data, key=key) == sorted(
expected_policies_data, key=key
)
2019-09-09 13:50:19 +00:00
2019-09-09 12:41:46 +00:00
def test_query_parser(start_cluster):
2019-09-09 13:50:19 +00:00
try:
with pytest.raises(QueryRuntimeException):
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS table_with_absent_policy (
2019-09-09 13:50:19 +00:00
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy='very_exciting_policy'
"""
)
2019-09-09 13:50:19 +00:00
with pytest.raises(QueryRuntimeException):
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS table_with_absent_policy (
2019-09-09 13:50:19 +00:00
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy='jbod1'
"""
)
2019-09-09 13:50:19 +00:00
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS table_with_normal_policy (
2019-09-09 13:50:19 +00:00
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy='default'
"""
)
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
node1.query("INSERT INTO table_with_normal_policy VALUES (5)")
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE table_with_normal_policy MOVE PARTITION tuple() TO VOLUME 'some_volume'"
)
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE table_with_normal_policy MOVE PARTITION tuple() TO DISK 'some_volume'"
)
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE table_with_normal_policy MOVE PART 'xxxxx' TO DISK 'jbod1'"
)
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE table_with_normal_policy MOVE PARTITION 'yyyy' TO DISK 'jbod1'"
)
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE table_with_normal_policy MODIFY SETTING storage_policy='moving_jbod_with_external'"
)
2019-09-09 13:50:19 +00:00
finally:
2020-10-21 13:36:03 +00:00
node1.query("DROP TABLE IF EXISTS table_with_normal_policy SYNC")
2019-09-09 12:41:46 +00:00
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("test_alter_policy", "MergeTree()", id="mt"),
pytest.param(
"replicated_test_alter_policy",
"ReplicatedMergeTree('/clickhouse/test_alter_policy', '1')",
id="replicated",
),
],
)
2020-01-09 13:52:37 +00:00
def test_alter_policy(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2020-01-09 13:52:37 +00:00
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='small_jbod_with_external'
""".format(
name=name, engine=engine
)
)
2020-01-09 13:52:37 +00:00
assert (
node1.query(
"""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
name=name
)
)
== "small_jbod_with_external\n"
)
2020-01-09 13:52:37 +00:00
with pytest.raises(QueryRuntimeException):
node1.query(
"""ALTER TABLE {name} MODIFY SETTING storage_policy='one_more_small_jbod_with_external'""".format(
name=name
)
)
2020-01-09 13:52:37 +00:00
assert (
node1.query(
"""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
name=name
)
)
== "small_jbod_with_external\n"
)
2020-01-09 13:52:37 +00:00
node1.query_with_retry(
"""ALTER TABLE {name} MODIFY SETTING storage_policy='jbods_with_external'""".format(
name=name
)
)
2020-01-09 13:52:37 +00:00
assert (
node1.query(
"""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
name=name
)
)
== "jbods_with_external\n"
)
2020-01-09 13:52:37 +00:00
with pytest.raises(QueryRuntimeException):
node1.query(
"""ALTER TABLE {name} MODIFY SETTING storage_policy='small_jbod_with_external'""".format(
name=name
)
)
2020-01-09 13:52:37 +00:00
assert (
node1.query(
"""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
name=name
)
)
== "jbods_with_external\n"
)
2020-01-09 13:52:37 +00:00
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2020-01-09 13:52:37 +00:00
2019-08-20 18:00:48 +00:00
def get_random_string(length):
2023-01-05 16:23:49 +00:00
return "randomPrintableASCII({})".format(length)
2019-08-20 18:00:48 +00:00
2019-08-20 18:00:48 +00:00
def get_used_disks_for_table(node, table_name):
return tuple(
node.query(
"select disk_name from system.parts where table == '{}' and active=1 order by modification_time".format(
table_name
)
)
.strip()
.split("\n")
)
2019-08-20 18:00:48 +00:00
def get_used_parts_for_table(node, table_name):
return node.query(
"SELECT name FROM system.parts WHERE table = '{}' AND active = 1 ORDER BY modification_time".format(
table_name
)
).splitlines()
2019-10-23 11:25:51 +00:00
def test_no_warning_about_zero_max_data_part_size(start_cluster):
def get_log(node):
return node.exec_in_container(
["bash", "-c", "cat /var/log/clickhouse-server/clickhouse-server.log"]
)
2019-10-23 11:25:51 +00:00
for node in (node1, node2):
node.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS default.test_warning_table (
2019-10-23 11:25:51 +00:00
s String
) ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy='small_jbod_with_external'
"""
)
2021-05-17 11:16:16 +00:00
node.query("DROP TABLE IF EXISTS default.test_warning_table SYNC")
2019-10-23 11:25:51 +00:00
log = get_log(node)
assert not re.search("Warning.*Volume.*special_warning_zero_volume", log)
assert not re.search("Warning.*Volume.*special_warning_default_volume", log)
assert re.search("Warning.*Volume.*special_warning_small_volume", log)
assert not re.search("Warning.*Volume.*special_warning_big_volume", log)
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("mt_on_jbod", "MergeTree()", id="mt"),
pytest.param(
"replicated_mt_on_jbod",
"ReplicatedMergeTree('/clickhouse/replicated_mt_on_jbod', '1')",
id="replicated",
),
],
)
2019-08-20 18:35:35 +00:00
def test_round_robin(start_cluster, name, engine):
2019-08-20 18:00:48 +00:00
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-08-20 18:00:48 +00:00
d UInt64
2019-08-20 18:35:35 +00:00
) ENGINE = {engine}
2019-08-20 18:00:48 +00:00
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
2019-08-20 18:00:48 +00:00
# first should go to the jbod1
node1.query_with_retry(
"insert into {} select * from numbers(10000)".format(name)
)
2019-08-20 18:35:35 +00:00
used_disk = get_used_disks_for_table(node1, name)
assert len(used_disk) == 1, "More than one disk used for single insert"
2019-08-20 18:00:48 +00:00
2022-02-04 20:56:49 +00:00
# sleep is required because we order disks by their modification time, and if insert will be fast
# modification time of two disks will be equal, then sort will not provide deterministic results
time.sleep(5)
node1.query_with_retry(
"insert into {} select * from numbers(10000, 10000)".format(name)
)
2019-08-20 18:35:35 +00:00
used_disks = get_used_disks_for_table(node1, name)
2019-08-20 18:00:48 +00:00
assert len(used_disks) == 2, "Two disks should be used for two parts"
2019-08-20 18:35:35 +00:00
assert used_disks[0] != used_disks[1], "Should write to different disks"
2019-08-20 18:00:48 +00:00
2022-02-04 20:56:49 +00:00
time.sleep(5)
node1.query_with_retry(
"insert into {} select * from numbers(20000, 10000)".format(name)
)
2019-08-20 18:35:35 +00:00
used_disks = get_used_disks_for_table(node1, name)
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
# jbod1 -> jbod2 -> jbod1 -> jbod2 ... etc
2019-08-20 18:00:48 +00:00
assert len(used_disks) == 3
2019-08-20 18:35:35 +00:00
assert used_disks[0] != used_disks[1]
assert used_disks[2] == used_disks[0]
2019-08-20 18:00:48 +00:00
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2019-08-20 18:00:48 +00:00
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("mt_with_huge_part", "MergeTree()", id="mt"),
pytest.param(
"replicated_mt_with_huge_part",
"ReplicatedMergeTree('/clickhouse/replicated_mt_with_huge_part', '1')",
id="replicated",
),
],
)
2019-08-20 18:35:35 +00:00
def test_max_data_part_size(start_cluster, name, engine):
2019-08-20 18:00:48 +00:00
try:
assert (
int(
*node1.query(
"""SELECT max_data_part_size FROM system.storage_policies WHERE policy_name = 'jbods_with_external' AND volume_name = 'main'"""
).splitlines()
)
== 10 * 1024 * 1024
)
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-08-20 18:00:48 +00:00
s1 String
2019-08-20 18:35:35 +00:00
) ENGINE = {engine}
2019-08-20 18:00:48 +00:00
ORDER BY tuple()
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
data = [] # 10MB in total
2019-08-20 18:00:48 +00:00
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB row
2019-08-20 18:00:48 +00:00
node1.query_with_retry(
"INSERT INTO {} VALUES {}".format(
2023-01-05 16:23:49 +00:00
name, ",".join(["(" + x + ")" for x in data])
)
)
2019-08-20 18:35:35 +00:00
used_disks = get_used_disks_for_table(node1, name)
2019-08-20 18:00:48 +00:00
assert len(used_disks) == 1
assert used_disks[0] == "external"
2019-08-20 18:00:48 +00:00
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2019-08-20 18:00:48 +00:00
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("mt_with_overflow", "MergeTree()", id="mt"),
pytest.param(
"replicated_mt_with_overflow",
"ReplicatedMergeTree('/clickhouse/replicated_mt_with_overflow', '1')",
id="replicated",
),
],
)
2019-08-20 18:35:35 +00:00
def test_jbod_overflow(start_cluster, name, engine):
2019-08-20 18:00:48 +00:00
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-08-20 18:00:48 +00:00
s1 String
2019-08-20 18:35:35 +00:00
) ENGINE = {engine}
2019-08-20 18:00:48 +00:00
ORDER BY tuple()
SETTINGS storage_policy='small_jbod_with_external'
""".format(
name=name, engine=engine
)
)
2019-08-20 18:00:48 +00:00
node1.query(f"SYSTEM STOP MERGES {name}")
# The test tries to utilize 35/40=87.5% of space, while during last
# INSERT parts mover may see up to ~100% of used space on disk due to
# reservations (since INSERT first reserves the space and later write
# the same, more or less, amount of space, and util the reservation had
# been destroyed it will be taken into account as reserved on the
# disk).
node1.query(f"SYSTEM STOP MOVES {name}")
2019-08-20 18:00:48 +00:00
# small jbod size is 40MB, so lets insert 5MB batch 7 times
for i in range(7):
data = [] # 5MB in total
2019-08-20 18:35:35 +00:00
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
node1.query_with_retry(
"INSERT INTO {} VALUES {}".format(
2023-01-05 16:23:49 +00:00
name, ",".join(["(" + x + ")" for x in data])
)
)
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
used_disks = get_used_disks_for_table(node1, name)
assert used_disks == tuple("jbod1" for _ in used_disks)
2019-08-20 18:00:48 +00:00
# should go to the external disk (jbod is overflown)
data = [] # 10MB in total
2019-08-20 18:00:48 +00:00
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB row
2019-08-20 18:00:48 +00:00
node1.query_with_retry(
"INSERT INTO {} VALUES {}".format(
2023-01-05 16:23:49 +00:00
name, ",".join(["(" + x + ")" for x in data])
)
)
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
used_disks = get_used_disks_for_table(node1, name)
2019-08-20 18:00:48 +00:00
assert used_disks[-1] == "external"
2019-08-20 18:00:48 +00:00
node1.query(f"SYSTEM START MERGES {name}")
node1.query(f"SYSTEM START MOVES {name}")
2019-08-20 18:35:35 +00:00
time.sleep(1)
2019-08-20 18:00:48 +00:00
2021-05-17 11:16:16 +00:00
node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
2019-08-20 18:35:35 +00:00
time.sleep(2)
disks_for_merges = tuple(
node1.query(
"SELECT disk_name FROM system.parts WHERE table == '{}' AND level >= 1 and active = 1 ORDER BY modification_time".format(
name
)
)
.strip()
.split("\n")
)
2019-08-20 18:00:48 +00:00
assert disks_for_merges == tuple("external" for _ in disks_for_merges)
2019-08-20 18:00:48 +00:00
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2019-08-20 18:00:48 +00:00
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("moving_mt", "MergeTree()", id="mt"),
pytest.param(
"moving_replicated_mt",
"ReplicatedMergeTree('/clickhouse/moving_replicated_mt', '1')",
id="replicated",
),
],
)
2019-08-20 18:00:48 +00:00
def test_background_move(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-08-20 18:00:48 +00:00
s1 String
) ENGINE = {engine}
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external'
""".format(
name=name, engine=engine
)
)
2019-08-20 18:00:48 +00:00
node1.query(f"SYSTEM STOP MERGES {name}")
2019-08-20 18:00:48 +00:00
for i in range(5):
data = [] # 5MB in total
2019-08-20 18:00:48 +00:00
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
2019-09-04 17:26:53 +00:00
# small jbod size is 40MB, so lets insert 5MB batch 5 times
node1.query_with_retry(
"INSERT INTO {} VALUES {}".format(
2023-01-05 16:23:49 +00:00
name, ",".join(["(" + x + ")" for x in data])
)
)
2019-08-20 18:00:48 +00:00
used_disks = get_used_disks_for_table(node1, name)
2019-08-21 10:09:29 +00:00
retry = 20
i = 0
while not sum(1 for x in used_disks if x == "jbod1") <= 2 and i < retry:
2019-08-21 10:09:29 +00:00
time.sleep(0.5)
used_disks = get_used_disks_for_table(node1, name)
i += 1
assert sum(1 for x in used_disks if x == "jbod1") <= 2
2019-08-20 18:00:48 +00:00
# first (oldest) part was moved to external
assert used_disks[0] == "external"
2019-08-20 18:00:48 +00:00
2023-01-05 16:23:49 +00:00
node1.query("SYSTEM FLUSH LOGS")
path = node1.query(
2023-01-05 16:23:49 +00:00
"SELECT path_on_disk FROM system.part_log WHERE table = '{}' AND event_type='MovePart' AND part_name = 'all_1_1_0'".format(
name
)
)
2019-09-03 11:32:25 +00:00
# first (oldest) part was moved to external
assert path.startswith("/external")
node1.query(f"SYSTEM START MERGES {name}")
2019-08-20 18:00:48 +00:00
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2019-08-20 18:00:48 +00:00
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("stopped_moving_mt", "MergeTree()", id="mt"),
pytest.param(
"stopped_moving_replicated_mt",
"ReplicatedMergeTree('/clickhouse/stopped_moving_replicated_mt', '1')",
id="replicated",
),
],
)
2019-09-03 14:50:49 +00:00
def test_start_stop_moves(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-09-03 14:50:49 +00:00
s1 String
) ENGINE = {engine}
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external'
""".format(
name=name, engine=engine
)
)
2019-09-03 14:50:49 +00:00
2021-05-17 11:16:16 +00:00
node1.query_with_retry("INSERT INTO {} VALUES ('HELLO')".format(name))
node1.query_with_retry("INSERT INTO {} VALUES ('WORLD')".format(name))
2019-09-03 14:50:49 +00:00
used_disks = get_used_disks_for_table(node1, name)
assert all(d == "jbod1" for d in used_disks), "All writes shoud go to jbods"
first_part = node1.query(
"SELECT name FROM system.parts WHERE table = '{}' and active = 1 ORDER BY modification_time LIMIT 1".format(
name
)
).strip()
2019-09-03 14:50:49 +00:00
node1.query("SYSTEM STOP MOVES")
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(
name, first_part
)
)
2019-09-03 14:50:49 +00:00
used_disks = get_used_disks_for_table(node1, name)
assert all(
d == "jbod1" for d in used_disks
), "Blocked moves doesn't actually move something"
2019-09-03 14:50:49 +00:00
node1.query("SYSTEM START MOVES")
node1.query(
"ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(
name, first_part
)
)
2019-09-03 14:50:49 +00:00
disk = node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(
name, first_part
)
).strip()
2019-09-03 14:50:49 +00:00
assert disk == "external"
2021-05-17 11:16:16 +00:00
node1.query_with_retry("TRUNCATE TABLE {}".format(name))
2019-09-03 14:50:49 +00:00
node1.query("SYSTEM STOP MOVES {}".format(name))
node1.query("SYSTEM STOP MERGES {}".format(name))
for i in range(5):
data = [] # 5MB in total
2019-09-03 14:50:49 +00:00
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
2019-09-03 14:50:49 +00:00
# jbod size is 40MB, so lets insert 5MB batch 7 times
node1.query_with_retry(
"INSERT INTO {} VALUES {}".format(
2023-01-05 16:23:49 +00:00
name, ",".join(["(" + x + ")" for x in data])
)
)
2019-09-03 14:50:49 +00:00
used_disks = get_used_disks_for_table(node1, name)
retry = 5
i = 0
while not sum(1 for x in used_disks if x == "jbod1") <= 2 and i < retry:
2019-09-03 14:50:49 +00:00
time.sleep(0.1)
used_disks = get_used_disks_for_table(node1, name)
i += 1
# first (oldest) part doesn't move anywhere
assert used_disks[0] == "jbod1"
2019-09-03 14:50:49 +00:00
node1.query("SYSTEM START MOVES {}".format(name))
# wait sometime until background backoff finishes
2023-06-27 13:32:14 +00:00
retry = 60
2019-09-03 14:50:49 +00:00
i = 0
while not sum(1 for x in used_disks if x == "jbod1") <= 2 and i < retry:
2019-09-03 14:50:49 +00:00
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
i += 1
node1.query("SYSTEM START MERGES {}".format(name))
assert sum(1 for x in used_disks if x == "jbod1") <= 2
2019-09-03 14:50:49 +00:00
# first (oldest) part moved to external
assert used_disks[0] == "external"
2019-09-03 14:50:49 +00:00
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2019-09-03 11:32:25 +00:00
2019-09-03 11:32:25 +00:00
def get_path_for_part_from_part_log(node, table, part_name):
node.query("SYSTEM FLUSH LOGS")
path = node.query(
"SELECT path_on_disk FROM system.part_log WHERE table = '{}' and part_name = '{}' ORDER BY event_time DESC LIMIT 1".format(
table, part_name
)
)
2019-09-03 11:32:25 +00:00
return path.strip()
2019-09-03 11:32:25 +00:00
def get_paths_for_partition_from_part_log(node, table, partition_id):
node.query("SYSTEM FLUSH LOGS")
paths = node.query(
"SELECT path_on_disk FROM system.part_log WHERE table = '{}' and partition_id = '{}' ORDER BY event_time DESC".format(
table, partition_id
)
)
return paths.strip().split("\n")
@pytest.mark.parametrize(
2022-03-29 03:53:51 +00:00
"name,engine,use_metadata_cache",
[
2022-03-29 03:53:51 +00:00
pytest.param("altering_mt", "MergeTree()", "false", id="mt"),
pytest.param("altering_mt", "MergeTree()", "true", id="mt_use_metadata_cache"),
# ("altering_replicated_mt","ReplicatedMergeTree('/clickhouse/altering_replicated_mt', '1')",),
# SYSTEM STOP MERGES doesn't disable merges assignments
],
)
def test_alter_move(start_cluster, name, engine, use_metadata_cache):
2019-08-20 19:04:58 +00:00
try:
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-08-20 19:04:58 +00:00
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external', use_metadata_cache={use_metadata_cache}
""".format(
2022-03-23 03:36:50 +00:00
name=name, engine=engine, use_metadata_cache=use_metadata_cache
)
)
2019-08-20 19:04:58 +00:00
node1.query("SYSTEM STOP MERGES {}".format(name)) # to avoid conflicts
2019-09-05 15:53:23 +00:00
2019-08-20 19:04:58 +00:00
node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 66)".format(name))
node1.query("INSERT INTO {} VALUES(toDate('2019-04-10'), 42)".format(name))
node1.query("INSERT INTO {} VALUES(toDate('2019-04-11'), 43)".format(name))
assert node1.query("CHECK TABLE " + name) == "1\n"
2019-08-20 19:04:58 +00:00
used_disks = get_used_disks_for_table(node1, name)
assert all(
d.startswith("jbod") for d in used_disks
), "All writes should go to jbods"
2019-08-20 19:04:58 +00:00
first_part = node1.query(
"SELECT name FROM system.parts WHERE table = '{}' and active = 1 ORDER BY modification_time LIMIT 1".format(
name
)
).strip()
2019-08-20 19:04:58 +00:00
2019-09-03 17:06:36 +00:00
time.sleep(1)
node1.query(
"ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(
name, first_part
)
)
assert node1.query("CHECK TABLE " + name) == "1\n"
disk = node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(
name, first_part
)
).strip()
assert disk == "external"
assert get_path_for_part_from_part_log(node1, name, first_part).startswith(
"/external"
)
2019-08-20 19:04:58 +00:00
2019-09-03 17:06:36 +00:00
time.sleep(1)
node1.query(
"ALTER TABLE {} MOVE PART '{}' TO DISK 'jbod1'".format(name, first_part)
)
assert node1.query("CHECK TABLE " + name) == "1\n"
disk = node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(
name, first_part
)
).strip()
assert disk == "jbod1"
assert get_path_for_part_from_part_log(node1, name, first_part).startswith(
"/jbod1"
)
2019-08-20 19:04:58 +00:00
2019-09-03 17:06:36 +00:00
time.sleep(1)
node1.query(
"ALTER TABLE {} MOVE PARTITION 201904 TO VOLUME 'external'".format(name)
)
assert node1.query("CHECK TABLE " + name) == "1\n"
disks = (
node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201904' and active = 1".format(
name
)
)
.strip()
.split("\n")
)
2019-08-20 19:04:58 +00:00
assert len(disks) == 2
assert all(d == "external" for d in disks)
assert all(
path.startswith("/external")
for path in get_paths_for_partition_from_part_log(node1, name, "201904")[:2]
)
2019-09-03 11:32:25 +00:00
2019-09-03 17:06:36 +00:00
time.sleep(1)
2019-08-20 19:04:58 +00:00
node1.query("ALTER TABLE {} MOVE PARTITION 201904 TO DISK 'jbod2'".format(name))
assert node1.query("CHECK TABLE " + name) == "1\n"
disks = (
node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201904' and active = 1".format(
name
)
)
.strip()
.split("\n")
)
2019-08-20 19:04:58 +00:00
assert len(disks) == 2
assert all(d == "jbod2" for d in disks)
assert all(
path.startswith("/jbod2")
for path in get_paths_for_partition_from_part_log(node1, name, "201904")[:2]
)
2019-08-20 19:04:58 +00:00
2019-08-20 19:06:03 +00:00
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "4\n"
2019-08-20 19:04:58 +00:00
finally:
2020-10-21 13:36:03 +00:00
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
2019-08-20 19:04:58 +00:00
@pytest.mark.parametrize("volume_or_disk", ["DISK", "VOLUME"])
def test_alter_move_half_of_partition(start_cluster, volume_or_disk):
name = "alter_move_half_of_partition"
engine = "MergeTree()"
try:
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
node1.query("SYSTEM STOP MERGES {}".format(name))
node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 42)".format(name))
used_disks = get_used_disks_for_table(node1, name)
assert all(
d.startswith("jbod") for d in used_disks
), "All writes should go to jbods"
time.sleep(1)
parts = node1.query(
"SELECT name FROM system.parts WHERE table = '{}' and active = 1".format(
name
)
).splitlines()
assert len(parts) == 2
node1.query(
"ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(name, parts[0])
)
disks = node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(
name, parts[0]
)
).splitlines()
assert disks == ["external"]
time.sleep(1)
node1.query(
"ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(
name, volume_or_disk=volume_or_disk
)
)
disks = node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201903' and active = 1".format(
name
)
).splitlines()
assert disks == ["external"] * 2
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "2\n"
finally:
2020-10-21 13:36:03 +00:00
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
@pytest.mark.parametrize("volume_or_disk", ["DISK", "VOLUME"])
def test_alter_double_move_partition(start_cluster, volume_or_disk):
name = "alter_double_move_partition"
engine = "MergeTree()"
try:
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
node1.query("SYSTEM STOP MERGES {}".format(name))
node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 42)".format(name))
used_disks = get_used_disks_for_table(node1, name)
assert all(
d.startswith("jbod") for d in used_disks
), "All writes should go to jbods"
time.sleep(1)
node1.query(
"ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(
name, volume_or_disk=volume_or_disk
)
)
disks = node1.query(
"SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201903' and active = 1".format(
name
)
).splitlines()
assert disks == ["external"] * 2
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "2\n"
time.sleep(1)
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(
name, volume_or_disk=volume_or_disk
)
)
finally:
2020-10-21 13:36:03 +00:00
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
2019-08-21 13:06:01 +00:00
def produce_alter_move(node, name):
move_type = random.choice(["PART", "PARTITION"])
if move_type == "PART":
2019-09-03 09:16:35 +00:00
for _ in range(10):
try:
parts = (
node1.query(
"SELECT name from system.parts where table = '{}' and active = 1".format(
name
)
)
.strip()
.split("\n")
)
2019-09-03 09:16:35 +00:00
break
except QueryRuntimeException:
pass
else:
raise Exception("Cannot select from system.parts")
2019-08-21 13:06:01 +00:00
move_part = random.choice(["'" + part + "'" for part in parts])
else:
move_part = random.choice([201903, 201904])
move_disk = random.choice(["DISK", "VOLUME"])
if move_disk == "DISK":
move_volume = random.choice(["'external'", "'jbod1'", "'jbod2'"])
else:
move_volume = random.choice(["'main'", "'external'"])
try:
node1.query(
"ALTER TABLE {} MOVE {mt} {mp} TO {md} {mv}".format(
name, mt=move_type, mp=move_part, md=move_disk, mv=move_volume
)
)
2019-08-21 13:06:01 +00:00
except QueryRuntimeException as ex:
pass
2019-08-20 19:04:58 +00:00
@pytest.mark.parametrize(
2022-03-29 03:53:51 +00:00
"name,engine",
[
2022-03-29 03:53:51 +00:00
pytest.param("concurrently_altering_mt", "MergeTree()", id="mt"),
pytest.param(
"concurrently_altering_replicated_mt",
"ReplicatedMergeTree('/clickhouse/concurrently_altering_replicated_mt', '1')",
id="replicated",
),
],
)
def test_concurrent_alter_move(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
values = list({random.randint(1, 1000000) for _ in range(0, 1000)})
def insert(num):
for i in range(num):
day = random.randint(11, 30)
value = values.pop()
month = "0" + str(random.choice([3, 4]))
node1.query_with_retry(
"INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(
name, m=month, d=day, v=value
)
)
def alter_move(num):
for i in range(num):
2019-08-21 13:06:01 +00:00
produce_alter_move(node1, name)
def alter_update(num):
for i in range(num):
node1.query(
"ALTER TABLE {} UPDATE number = number + 1 WHERE 1".format(name)
)
def optimize_table(num):
for i in range(num):
2021-05-17 11:16:16 +00:00
node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
p = Pool(15)
tasks = []
for i in range(5):
tasks.append(p.apply_async(insert, (100,)))
tasks.append(p.apply_async(alter_move, (100,)))
2019-08-21 13:06:01 +00:00
tasks.append(p.apply_async(alter_update, (100,)))
tasks.append(p.apply_async(optimize_table, (100,)))
for task in tasks:
task.get(timeout=240)
assert node1.query("SELECT 1") == "1\n"
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n"
finally:
2020-10-21 13:36:03 +00:00
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("concurrently_dropping_mt", "MergeTree()", id="mt"),
pytest.param(
"concurrently_dropping_replicated_mt",
"ReplicatedMergeTree('/clickhouse/concurrently_dropping_replicated_mt', '1')",
id="replicated",
),
],
)
2019-08-21 13:06:01 +00:00
def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
try:
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-08-21 13:06:01 +00:00
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
2019-08-21 13:06:01 +00:00
values = list({random.randint(1, 1000000) for _ in range(0, 1000)})
2019-08-21 13:06:01 +00:00
def insert(num):
for i in range(num):
day = random.randint(11, 30)
value = values.pop()
month = "0" + str(random.choice([3, 4]))
node1.query_with_retry(
"INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(
name, m=month, d=day, v=value
)
)
2019-08-21 13:06:01 +00:00
def alter_move(num):
for i in range(num):
produce_alter_move(node1, name)
def alter_drop(num):
for i in range(num):
partition = random.choice([201903, 201904])
2022-09-22 22:51:13 +00:00
op = random.choice(["drop", "detach"])
try:
node1.query(
"ALTER TABLE {} {} PARTITION {}".format(name, op, partition)
)
except QueryRuntimeException as e:
if "Code: 650" in e.stderr:
pass
else:
raise e
insert(20)
2019-08-21 13:06:01 +00:00
p = Pool(15)
tasks = []
for i in range(5):
tasks.append(p.apply_async(insert, (20,)))
tasks.append(p.apply_async(alter_move, (20,)))
tasks.append(p.apply_async(alter_drop, (20,)))
2019-08-21 13:06:01 +00:00
for task in tasks:
2021-03-26 10:08:24 +00:00
task.get(timeout=120)
2019-08-21 13:06:01 +00:00
assert node1.query("SELECT 1") == "1\n"
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2019-08-21 13:06:01 +00:00
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("detach_attach_mt", "MergeTree()", id="mt"),
pytest.param(
"replicated_detach_attach_mt",
"ReplicatedMergeTree('/clickhouse/replicated_detach_attach_mt', '1')",
id="replicated",
),
],
)
def test_detach_attach(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
s1 String
) ENGINE = {engine}
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external'
""".format(
name=name, engine=engine
)
)
data = [] # 5MB in total
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
node1.query_with_retry(
"INSERT INTO {} VALUES {}".format(
2023-01-05 16:23:49 +00:00
name, ",".join(["(" + x + ")" for x in data])
)
)
node1.query("ALTER TABLE {} DETACH PARTITION tuple()".format(name))
assert node1.query("SELECT count() FROM {}".format(name)).strip() == "0"
assert (
node1.query(
"SELECT disk FROM system.detached_parts WHERE table = '{}'".format(name)
).strip()
== "jbod1"
)
node1.query("ALTER TABLE {} ATTACH PARTITION tuple()".format(name))
assert node1.query("SELECT count() FROM {}".format(name)).strip() == "5"
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("mutating_mt", "MergeTree()", id="mt"),
pytest.param(
"replicated_mutating_mt",
"ReplicatedMergeTree('/clickhouse/replicated_mutating_mt', '1')",
id="replicated",
),
],
)
2019-09-04 17:26:53 +00:00
def test_mutate_to_another_disk(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-09-04 17:26:53 +00:00
s1 String
) ENGINE = {engine}
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external'
""".format(
name=name, engine=engine
)
)
2019-09-04 17:26:53 +00:00
for i in range(5):
data = [] # 5MB in total
2019-09-04 17:26:53 +00:00
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
node1.query_with_retry(
"INSERT INTO {} VALUES {}".format(
2023-01-05 16:23:49 +00:00
name, ",".join(["(" + x + ")" for x in data])
)
)
2019-09-04 17:26:53 +00:00
node1.query("ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name))
2019-09-05 13:12:29 +00:00
retry = 20
while (
node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != ""
and retry > 0
):
2019-09-04 17:26:53 +00:00
retry -= 1
time.sleep(0.5)
if (
node1.query(
"SELECT latest_fail_reason FROM system.mutations WHERE table = '{}'".format(
name
)
)
== ""
):
assert (
node1.query("SELECT sum(endsWith(s1, 'x')) FROM {}".format(name))
== "25\n"
)
else: # mutation failed, let's try on another disk
2020-10-02 16:54:07 +00:00
print("Mutation failed")
2021-05-17 11:16:16 +00:00
node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
node1.query(
"ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name)
)
2019-09-05 15:53:23 +00:00
retry = 20
while (
node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != ""
and retry > 0
):
2019-09-05 15:53:23 +00:00
retry -= 1
time.sleep(0.5)
assert (
node1.query("SELECT sum(endsWith(s1, 'x')) FROM {}".format(name))
== "25\n"
)
2019-09-05 15:53:23 +00:00
2019-09-04 17:26:53 +00:00
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2019-09-04 17:26:53 +00:00
@pytest.mark.parametrize(
"name,engine",
[
pytest.param("alter_modifying_mt", "MergeTree()", id="mt"),
pytest.param(
"replicated_alter_modifying_mt",
"ReplicatedMergeTree('/clickhouse/replicated_alter_modifying_mt', '1')",
id="replicated",
),
],
)
2019-09-06 15:09:20 +00:00
def test_concurrent_alter_modify(start_cluster, name, engine):
try:
node1.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
2019-09-06 15:09:20 +00:00
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name, engine=engine
)
)
2019-09-04 17:26:53 +00:00
values = list({random.randint(1, 1000000) for _ in range(0, 1000)})
2019-09-06 15:09:20 +00:00
def insert(num):
for i in range(num):
day = random.randint(11, 30)
value = values.pop()
month = "0" + str(random.choice([3, 4]))
node1.query_with_retry(
"INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(
name, m=month, d=day, v=value
)
)
2019-09-04 17:26:53 +00:00
2019-09-06 15:09:20 +00:00
def alter_move(num):
for i in range(num):
produce_alter_move(node1, name)
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
def alter_modify(num):
for i in range(num):
column_type = random.choice(["UInt64", "String"])
2020-04-09 04:53:57 +00:00
try:
node1.query(
"ALTER TABLE {} MODIFY COLUMN number {}".format(
name, column_type
)
)
2020-04-09 04:53:57 +00:00
except:
if "Replicated" not in engine:
raise
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
insert(100)
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
p = Pool(50)
tasks = []
for i in range(5):
tasks.append(p.apply_async(alter_move, (100,)))
tasks.append(p.apply_async(alter_modify, (100,)))
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
for task in tasks:
task.get(timeout=120)
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
assert node1.query("SELECT 1") == "1\n"
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
finally:
2021-05-17 11:16:16 +00:00
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
def test_simple_replication_and_moves(start_cluster):
try:
for i, node in enumerate([node1, node2]):
node.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS replicated_table_for_moves (
2019-09-06 15:09:20 +00:00
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1,
cleanup_delay_period=1, cleanup_delay_period_random_add=2, cleanup_thread_preferred_points_per_iteration=0
""".format(
i + 1
)
)
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
def insert(num):
for i in range(num):
node = random.choice([node1, node2])
data = [] # 1MB in total
for i in range(2):
data.append(get_random_string(512 * 1024)) # 500KB value
node.query_with_retry(
"INSERT INTO replicated_table_for_moves VALUES {}".format(
2023-01-05 16:23:49 +00:00
",".join(["(" + x + ")" for x in data])
)
)
2019-09-06 15:09:20 +00:00
def optimize(num):
for i in range(num):
node = random.choice([node1, node2])
2021-05-17 11:16:16 +00:00
node.query_with_retry("OPTIMIZE TABLE replicated_table_for_moves FINAL")
2019-09-06 15:09:20 +00:00
2019-09-10 11:21:59 +00:00
p = Pool(60)
2019-09-06 15:09:20 +00:00
tasks = []
tasks.append(p.apply_async(insert, (20,)))
tasks.append(p.apply_async(optimize, (20,)))
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
for task in tasks:
task.get(timeout=60)
2019-07-15 09:36:02 +00:00
node1.query_with_retry(
"SYSTEM SYNC REPLICA ON CLUSTER test_cluster replicated_table_for_moves",
timeout=5,
)
2019-07-15 09:36:02 +00:00
2019-09-10 11:21:59 +00:00
node1.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
node2.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
2019-07-15 09:36:02 +00:00
data = [] # 1MB in total
2019-09-06 15:09:20 +00:00
for i in range(2):
data.append(get_random_string(512 * 1024)) # 500KB value
2019-07-15 09:36:02 +00:00
time.sleep(3) # wait until old parts will be deleted
2019-09-10 11:21:59 +00:00
node1.query("SYSTEM STOP MERGES")
node2.query("SYSTEM STOP MERGES")
2019-07-15 09:36:02 +00:00
2021-05-17 11:16:16 +00:00
node1.query_with_retry(
"INSERT INTO replicated_table_for_moves VALUES {}".format(
2023-01-05 16:23:49 +00:00
",".join(["(" + x + ")" for x in data])
)
)
2021-05-17 11:16:16 +00:00
node2.query_with_retry(
"INSERT INTO replicated_table_for_moves VALUES {}".format(
2023-01-05 16:23:49 +00:00
",".join(["(" + x + ")" for x in data])
)
)
2019-07-15 09:36:02 +00:00
time.sleep(3) # nothing was moved
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
disks1 = get_used_disks_for_table(node1, "replicated_table_for_moves")
disks2 = get_used_disks_for_table(node2, "replicated_table_for_moves")
2019-07-15 09:36:02 +00:00
2022-01-28 06:30:57 +00:00
node2.query("SYSTEM START MERGES ON CLUSTER test_cluster")
2019-09-10 11:21:59 +00:00
set(disks1) == set(["jbod1", "external"])
set(disks2) == set(["jbod1", "external"])
2019-09-06 15:09:20 +00:00
finally:
for node in [node1, node2]:
2020-10-21 13:36:03 +00:00
node.query("DROP TABLE IF EXISTS replicated_table_for_moves SYNC")
2019-09-09 12:28:28 +00:00
2019-09-09 12:28:28 +00:00
def test_download_appropriate_disk(start_cluster):
try:
for i, node in enumerate([node1, node2]):
node.query_with_retry(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS replicated_table_for_download (
2019-09-09 12:28:28 +00:00
s1 String
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_download', '{}')
ORDER BY tuple()
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1,
cleanup_delay_period=1, cleanup_delay_period_random_add=2, cleanup_thread_preferred_points_per_iteration=0
""".format(
i + 1
)
)
2019-09-09 12:28:28 +00:00
data = []
for i in range(50):
data.append(get_random_string(1024 * 1024)) # 1MB value
2021-05-17 11:16:16 +00:00
node1.query_with_retry(
"INSERT INTO replicated_table_for_download VALUES {}".format(
2023-01-05 16:23:49 +00:00
",".join(["(" + x + ")" for x in data])
)
)
2019-09-09 12:28:28 +00:00
for _ in range(10):
try:
2020-10-02 16:54:07 +00:00
print("Syncing replica")
node2.query_with_retry(
"SYSTEM SYNC REPLICA replicated_table_for_download"
)
2019-09-09 12:28:28 +00:00
break
except:
time.sleep(0.5)
disks2 = get_used_disks_for_table(node2, "replicated_table_for_download")
assert set(disks2) == set(["external"])
finally:
for node in [node1, node2]:
node.query_with_retry(
"DROP TABLE IF EXISTS replicated_table_for_download SYNC"
)
def test_rename(start_cluster):
try:
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS default.renaming_table (
s String
) ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy='small_jbod_with_external'
"""
)
2023-01-13 11:12:27 +00:00
# We want to check that after inserts, some parts were moved to external disk
# and some parts are still on the main disk, but because of merge all parts
# might end up on external disk.
node1.query("SYSTEM STOP MERGES default.renaming_table")
2023-01-11 14:51:01 +00:00
# jbod1 disk is 40mb
for _ in range(5):
data = []
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB value
node1.query(
"INSERT INTO renaming_table VALUES {}".format(
2023-01-05 16:23:49 +00:00
",".join(["(" + x + ")" for x in data])
)
)
2023-01-11 14:51:01 +00:00
# data is moved in the background, so check with retries
num_try = 0
while get_used_disks_for_table(node1, "renaming_table") == 1:
time.sleep(1)
num_try += 1
if num_try == 20:
break
assert len(get_used_disks_for_table(node1, "renaming_table")) > 1
assert node1.query("SELECT COUNT() FROM default.renaming_table") == "50\n"
node1.query("RENAME TABLE default.renaming_table TO default.renaming_table1")
assert node1.query("SELECT COUNT() FROM default.renaming_table1") == "50\n"
with pytest.raises(QueryRuntimeException):
node1.query("SELECT COUNT() FROM default.renaming_table")
node1.query("CREATE DATABASE IF NOT EXISTS test")
node1.query("RENAME TABLE default.renaming_table1 TO test.renaming_table2")
assert node1.query("SELECT COUNT() FROM test.renaming_table2") == "50\n"
with pytest.raises(QueryRuntimeException):
node1.query("SELECT COUNT() FROM default.renaming_table1")
finally:
2020-10-21 13:36:03 +00:00
node1.query("DROP TABLE IF EXISTS default.renaming_table SYNC")
node1.query("DROP TABLE IF EXISTS default.renaming_table1 SYNC")
node1.query("DROP TABLE IF EXISTS test.renaming_table2 SYNC")
def test_freeze(start_cluster):
try:
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS default.freezing_table (
d Date,
s String
) ENGINE = MergeTree
ORDER BY tuple()
PARTITION BY toYYYYMM(d)
SETTINGS storage_policy='small_jbod_with_external', compress_marks=false, compress_primary_key=false
"""
)
2023-01-05 16:23:49 +00:00
node1.query("SYSTEM STOP MERGES default.freezing_table")
for _ in range(5):
data = []
dates = []
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB value
dates.append("toDate('2019-03-05')")
node1.query(
"INSERT INTO freezing_table VALUES {}".format(
2023-01-05 16:23:49 +00:00
",".join(["(" + d + ", " + s + ")" for d, s in zip(dates, data)])
)
)
disks = get_used_disks_for_table(node1, "freezing_table")
assert len(disks) > 1
assert node1.query("SELECT COUNT() FROM default.freezing_table") == "50\n"
node1.query("ALTER TABLE freezing_table FREEZE PARTITION 201903")
# check shadow files (backups) exists
node1.exec_in_container(
["bash", "-c", "find /jbod1/shadow -name '*.mrk2' | grep '.*'"]
)
node1.exec_in_container(
["bash", "-c", "find /external/shadow -name '*.mrk2' | grep '.*'"]
)
finally:
2020-10-21 13:36:03 +00:00
node1.query("DROP TABLE IF EXISTS default.freezing_table SYNC")
node1.exec_in_container(["rm", "-rf", "/jbod1/shadow", "/external/shadow"])
def test_kill_while_insert(start_cluster):
try:
2019-12-09 17:53:52 +00:00
name = "test_kill_while_insert"
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
s String
) ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy='small_jbod_with_external'
""".format(
name=name
)
)
data = []
dates = []
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB value
node1.query(
"INSERT INTO {name} VALUES {}".format(
2023-01-05 16:23:49 +00:00
",".join(["(" + s + ")" for s in data]), name=name
)
)
disks = get_used_disks_for_table(node1, name)
assert set(disks) == {"jbod1"}
2021-08-23 05:04:26 +00:00
def ignore_exceptions(f, *args):
try:
f(*args)
except:
"""(っಠ‿ಠ)っ"""
start_time = time.time()
long_select = threading.Thread(
target=ignore_exceptions,
args=(node1.query, "SELECT sleep(3) FROM {name}".format(name=name)),
)
long_select.start()
time.sleep(0.5)
node1.query(
"ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'external'".format(
name=name
)
)
assert time.time() - start_time < 2
node1.restart_clickhouse(kill=True)
try:
long_select.join()
except:
""""""
assert node1.query(
"SELECT count() FROM {name}".format(name=name)
).splitlines() == ["10"]
finally:
try:
2020-10-21 13:36:03 +00:00
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
2019-12-11 08:58:53 +00:00
except:
"""ClickHouse may be inactive at this moment and we don't want to mask a meaningful exception."""
def test_move_while_merge(start_cluster):
try:
name = "test_move_while_merge"
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
n Int64
) ENGINE = MergeTree
ORDER BY sleep(2)
SETTINGS storage_policy='small_jbod_with_external'
""".format(
name=name
)
)
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
node1.query("INSERT INTO {name} VALUES (2)".format(name=name))
parts = get_used_parts_for_table(node1, name)
assert len(parts) == 2
def optimize():
node1.query("OPTIMIZE TABLE {name}".format(name=name))
optimize = threading.Thread(target=optimize)
optimize.start()
time.sleep(0.5)
with pytest.raises(QueryRuntimeException):
node1.query(
"ALTER TABLE {name} MOVE PART '{part}' TO DISK 'external'".format(
name=name, part=parts[0]
)
)
exiting = False
no_exception = {}
def alter():
while not exiting:
try:
node1.query(
"ALTER TABLE {name} MOVE PART '{part}' TO DISK 'external'".format(
name=name, part=parts[0]
)
)
no_exception["missing"] = "exception"
break
except QueryRuntimeException:
""""""
alter_thread = threading.Thread(target=alter)
alter_thread.start()
optimize.join()
time.sleep(0.5)
exiting = True
alter_thread.join()
assert len(no_exception) == 0
assert node1.query(
"SELECT count() FROM {name}".format(name=name)
).splitlines() == ["2"]
finally:
2020-10-21 13:36:03 +00:00
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
def test_move_across_policies_does_not_work(start_cluster):
try:
name = "test_move_across_policies_does_not_work"
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
n Int64
) ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy='jbods_with_external'
""".format(
name=name
)
)
node1.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name}2 (
n Int64
) ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy='small_jbod_with_external'
""".format(
name=name
)
)
node1.query("""INSERT INTO {name} VALUES (1)""".format(name=name))
try:
node1.query(
"""ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'jbod2'""".format(
name=name
)
)
except QueryRuntimeException:
"""All parts of partition 'all' are already on disk 'jbod2'."""
with pytest.raises(
QueryRuntimeException,
match=".*because disk does not belong to storage policy.*",
):
node1.query(
"""ALTER TABLE {name}2 ATTACH PARTITION tuple() FROM {name}""".format(
name=name
)
)
with pytest.raises(
QueryRuntimeException,
match=".*because disk does not belong to storage policy.*",
):
node1.query(
"""ALTER TABLE {name}2 REPLACE PARTITION tuple() FROM {name}""".format(
name=name
)
)
with pytest.raises(
QueryRuntimeException,
match=".*should have the same storage policy of source table.*",
):
node1.query(
"""ALTER TABLE {name} MOVE PARTITION tuple() TO TABLE {name}2""".format(
name=name
)
)
assert node1.query(
"""SELECT * FROM {name}""".format(name=name)
).splitlines() == ["1"]
finally:
2020-10-21 13:36:03 +00:00
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
node1.query(f"DROP TABLE IF EXISTS {name}2 SYNC")
def _insert_merge_execute(
node, name, policy, parts, cmds, parts_before_cmds, parts_after_cmds
):
try:
node.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
n Int64
) ENGINE = MergeTree
ORDER BY tuple()
PARTITION BY tuple()
TTL now()-1 TO VOLUME 'external'
SETTINGS storage_policy='{policy}'
""".format(
name=name, policy=policy
)
)
for i in range(parts):
node.query("""INSERT INTO {name} VALUES ({n})""".format(name=name, n=i))
disks = get_used_disks_for_table(node, name)
assert set(disks) == {"external"}
node.query("""OPTIMIZE TABLE {name}""".format(name=name))
parts = get_used_parts_for_table(node, name)
assert len(parts) == parts_before_cmds
for cmd in cmds:
node.query(cmd)
node.query("""OPTIMIZE TABLE {name}""".format(name=name))
parts = get_used_parts_for_table(node, name)
assert len(parts) == parts_after_cmds
finally:
2020-10-21 13:36:03 +00:00
node.query(f"DROP TABLE IF EXISTS {name} SYNC")
def _check_merges_are_working(node, storage_policy, volume, shall_work):
try:
name = "_check_merges_are_working_{storage_policy}_{volume}".format(
storage_policy=storage_policy, volume=volume
)
node.query(
"""
2021-05-17 11:16:16 +00:00
CREATE TABLE IF NOT EXISTS {name} (
n Int64
) ENGINE = MergeTree
ORDER BY tuple()
PARTITION BY tuple()
SETTINGS storage_policy='{storage_policy}'
""".format(
name=name, storage_policy=storage_policy
)
)
created_parts = 24
for i in range(created_parts):
node.query("""INSERT INTO {name} VALUES ({n})""".format(name=name, n=i))
try:
node.query(
"""ALTER TABLE {name} MOVE PARTITION tuple() TO VOLUME '{volume}' """.format(
name=name, volume=volume
)
)
except:
"""Ignore 'nothing to move'."""
expected_disks = set(
node.query(
"""
SELECT disks FROM system.storage_policies ARRAY JOIN disks WHERE volume_name = '{volume_name}'
""".format(
volume_name=volume
)
).splitlines()
)
disks = get_used_disks_for_table(node, name)
assert set(disks) <= expected_disks
node.query("""OPTIMIZE TABLE {name} FINAL""".format(name=name))
parts = get_used_parts_for_table(node, name)
assert len(parts) == 1 if shall_work else created_parts
finally:
2020-10-21 13:36:03 +00:00
node.query(f"DROP TABLE IF EXISTS {name} SYNC")
def _get_prefer_not_to_merge_for_storage_policy(node, storage_policy):
return list(
map(
int,
node.query(
"SELECT prefer_not_to_merge FROM system.storage_policies WHERE policy_name = '{}' ORDER BY volume_priority".format(
storage_policy
)
).splitlines(),
)
)
def test_simple_merge_tree_merges_are_disabled(start_cluster):
_check_merges_are_working(
node1, "small_jbod_with_external_no_merges", "external", False
)
def test_no_merges_in_configuration_allow_from_query_without_reload(start_cluster):
try:
name = "test_no_merges_in_configuration_allow_from_query_without_reload"
policy = "small_jbod_with_external_no_merges"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
_insert_merge_execute(
node1,
name,
policy,
2,
["SYSTEM START MERGES ON VOLUME {}.external".format(policy)],
2,
1,
)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
finally:
node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy))
def test_no_merges_in_configuration_allow_from_query_with_reload(start_cluster):
try:
name = "test_no_merges_in_configuration_allow_from_query_with_reload"
policy = "small_jbod_with_external_no_merges"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
_insert_merge_execute(
node1,
name,
policy,
2,
[
"SYSTEM START MERGES ON VOLUME {}.external".format(policy),
"SYSTEM RELOAD CONFIG",
],
2,
1,
)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
finally:
node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy))
def test_no_merges_in_configuration_allow_from_query_with_reload_on_cluster(
start_cluster,
):
2022-01-28 06:30:57 +00:00
try:
name = "test_no_merges_in_configuration_allow_from_query_with_reload"
policy = "small_jbod_with_external_no_merges"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
_insert_merge_execute(
node1,
name,
policy,
2,
[
"SYSTEM START MERGES ON CLUSTER test_cluster ON VOLUME {}.external".format(
policy
),
"SYSTEM RELOAD CONFIG ON CLUSTER test_cluster",
],
2,
1,
)
2022-01-28 06:30:57 +00:00
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
finally:
node1.query(
"SYSTEM STOP MERGES ON CLUSTER test_cluster ON VOLUME {}.external".format(
policy
)
)
2022-01-28 06:30:57 +00:00
def test_yes_merges_in_configuration_disallow_from_query_without_reload(start_cluster):
try:
name = "test_yes_merges_in_configuration_allow_from_query_without_reload"
policy = "small_jbod_with_external"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
_insert_merge_execute(
node1,
name,
policy,
2,
[
"SYSTEM STOP MERGES ON VOLUME {}.external".format(policy),
"INSERT INTO {name} VALUES (2)".format(name=name),
],
1,
2,
)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
finally:
node1.query("SYSTEM START MERGES ON VOLUME {}.external".format(policy))
def test_yes_merges_in_configuration_disallow_from_query_with_reload(start_cluster):
try:
name = "test_yes_merges_in_configuration_allow_from_query_with_reload"
policy = "small_jbod_with_external"
node1.restart_clickhouse(kill=True)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
_check_merges_are_working(node1, policy, "external", True)
_insert_merge_execute(
node1,
name,
policy,
2,
[
"SYSTEM STOP MERGES ON VOLUME {}.external".format(policy),
"INSERT INTO {name} VALUES (2)".format(name=name),
"SYSTEM RELOAD CONFIG",
],
1,
2,
)
assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
_check_merges_are_working(node1, policy, "external", False)
finally:
node1.query("SYSTEM START MERGES ON VOLUME {}.external".format(policy))