mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
b75963d370
This PR formats all the `*.py` files found under the `tests/integration` folder. It also reorders the imports and cleans up a bunch of unused imports. The formatting also takes care of other things like wrapping lines and fixing spaces and indents such that the tests look more readable.
1348 lines
50 KiB
Python
1348 lines
50 KiB
Python
import json
|
|
import random
|
|
import re
|
|
import string
|
|
import threading
|
|
import time
|
|
from multiprocessing.dummy import Pool
|
|
|
|
import pytest
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
node1 = cluster.add_instance('node1',
|
|
main_configs=['configs/logs_config.xml', 'configs/config.d/storage_configuration.xml',
|
|
'configs/config.d/cluster.xml'],
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
|
|
macros={"shard": 0, "replica": 1})
|
|
|
|
node2 = cluster.add_instance('node2',
|
|
main_configs=['configs/logs_config.xml', 'configs/config.d/storage_configuration.xml',
|
|
'configs/config.d/cluster.xml'],
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
|
|
macros={"shard": 0, "replica": 2})
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_system_tables(start_cluster):
|
|
expected_disks_data = [
|
|
{
|
|
"name": "default",
|
|
"path": "/var/lib/clickhouse/",
|
|
"keep_free_space": '1024',
|
|
},
|
|
{
|
|
"name": "jbod1",
|
|
"path": "/jbod1/",
|
|
"keep_free_space": '0',
|
|
},
|
|
{
|
|
"name": "jbod2",
|
|
"path": "/jbod2/",
|
|
"keep_free_space": '10485760',
|
|
},
|
|
{
|
|
"name": "external",
|
|
"path": "/external/",
|
|
"keep_free_space": '0',
|
|
}
|
|
]
|
|
|
|
click_disk_data = json.loads(node1.query("SELECT name, path, keep_free_space FROM system.disks FORMAT JSON"))[
|
|
"data"]
|
|
assert sorted(click_disk_data, key=lambda x: x["name"]) == sorted(expected_disks_data, key=lambda x: x["name"])
|
|
|
|
expected_policies_data = [
|
|
{
|
|
"policy_name": "small_jbod_with_external",
|
|
"volume_name": "main",
|
|
"volume_priority": "1",
|
|
"disks": ["jbod1"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "small_jbod_with_external",
|
|
"volume_name": "external",
|
|
"volume_priority": "2",
|
|
"disks": ["external"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "one_more_small_jbod_with_external",
|
|
"volume_name": "m",
|
|
"volume_priority": "1",
|
|
"disks": ["jbod1"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "one_more_small_jbod_with_external",
|
|
"volume_name": "e",
|
|
"volume_priority": "2",
|
|
"disks": ["external"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "jbods_with_external",
|
|
"volume_name": "main",
|
|
"volume_priority": "1",
|
|
"disks": ["jbod1", "jbod2"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "10485760",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "jbods_with_external",
|
|
"volume_name": "external",
|
|
"volume_priority": "2",
|
|
"disks": ["external"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "moving_jbod_with_external",
|
|
"volume_name": "main",
|
|
"volume_priority": "1",
|
|
"disks": ["jbod1"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.7,
|
|
},
|
|
{
|
|
"policy_name": "moving_jbod_with_external",
|
|
"volume_name": "external",
|
|
"volume_priority": "2",
|
|
"disks": ["external"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.7,
|
|
},
|
|
{
|
|
"policy_name": "default_disk_with_external",
|
|
"volume_name": "small",
|
|
"volume_priority": "1",
|
|
"disks": ["default"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "2097152",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "default_disk_with_external",
|
|
"volume_name": "big",
|
|
"volume_priority": "2",
|
|
"disks": ["external"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "20971520",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "special_warning_policy",
|
|
"volume_name": "special_warning_zero_volume",
|
|
"volume_priority": "1",
|
|
"disks": ["default"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "special_warning_policy",
|
|
"volume_name": "special_warning_default_volume",
|
|
"volume_priority": "2",
|
|
"disks": ["external"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "0",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "special_warning_policy",
|
|
"volume_name": "special_warning_small_volume",
|
|
"volume_priority": "3",
|
|
"disks": ["jbod1"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "1024",
|
|
"move_factor": 0.1,
|
|
},
|
|
{
|
|
"policy_name": "special_warning_policy",
|
|
"volume_name": "special_warning_big_volume",
|
|
"volume_priority": "4",
|
|
"disks": ["jbod2"],
|
|
"volume_type": "JBOD",
|
|
"max_data_part_size": "1024000000",
|
|
"move_factor": 0.1,
|
|
},
|
|
]
|
|
|
|
clickhouse_policies_data = \
|
|
json.loads(node1.query("SELECT * FROM system.storage_policies WHERE policy_name != 'default' FORMAT JSON"))[
|
|
"data"]
|
|
|
|
def key(x):
|
|
return (x["policy_name"], x["volume_name"], x["volume_priority"])
|
|
|
|
assert sorted(clickhouse_policies_data, key=key) == sorted(expected_policies_data, key=key)
|
|
|
|
|
|
def test_query_parser(start_cluster):
|
|
try:
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("""
|
|
CREATE TABLE table_with_absent_policy (
|
|
d UInt64
|
|
) ENGINE = MergeTree()
|
|
ORDER BY d
|
|
SETTINGS storage_policy='very_exciting_policy'
|
|
""")
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("""
|
|
CREATE TABLE table_with_absent_policy (
|
|
d UInt64
|
|
) ENGINE = MergeTree()
|
|
ORDER BY d
|
|
SETTINGS storage_policy='jbod1'
|
|
""")
|
|
|
|
node1.query("""
|
|
CREATE TABLE table_with_normal_policy (
|
|
d UInt64
|
|
) ENGINE = MergeTree()
|
|
ORDER BY d
|
|
SETTINGS storage_policy='default'
|
|
""")
|
|
|
|
node1.query("INSERT INTO table_with_normal_policy VALUES (5)")
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION tuple() TO VOLUME 'some_volume'")
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION tuple() TO DISK 'some_volume'")
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("ALTER TABLE table_with_normal_policy MOVE PART 'xxxxx' TO DISK 'jbod1'")
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION 'yyyy' TO DISK 'jbod1'")
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query(
|
|
"ALTER TABLE table_with_normal_policy MODIFY SETTING storage_policy='moving_jbod_with_external'")
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS table_with_normal_policy")
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("test_alter_policy", "MergeTree()"),
|
|
("replicated_test_alter_policy", "ReplicatedMergeTree('/clickhouse/test_alter_policy', '1')",),
|
|
])
|
|
def test_alter_policy(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
d UInt64
|
|
) ENGINE = {engine}
|
|
ORDER BY d
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
|
|
name=name)) == "small_jbod_with_external\n"
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query(
|
|
"""ALTER TABLE {name} MODIFY SETTING storage_policy='one_more_small_jbod_with_external'""".format(
|
|
name=name))
|
|
|
|
assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
|
|
name=name)) == "small_jbod_with_external\n"
|
|
|
|
node1.query("""ALTER TABLE {name} MODIFY SETTING storage_policy='jbods_with_external'""".format(name=name))
|
|
|
|
assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
|
|
name=name)) == "jbods_with_external\n"
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query(
|
|
"""ALTER TABLE {name} MODIFY SETTING storage_policy='small_jbod_with_external'""".format(name=name))
|
|
|
|
assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
|
|
name=name)) == "jbods_with_external\n"
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
def get_random_string(length):
|
|
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
|
|
|
|
|
|
def get_used_disks_for_table(node, table_name):
|
|
return node.query(
|
|
"select disk_name from system.parts where table == '{}' and active=1 order by modification_time".format(
|
|
table_name)).strip().split('\n')
|
|
|
|
|
|
def test_no_warning_about_zero_max_data_part_size(start_cluster):
|
|
def get_log(node):
|
|
return node.exec_in_container(["bash", "-c", "cat /var/log/clickhouse-server/clickhouse-server.log"])
|
|
|
|
for node in (node1, node2):
|
|
node.query("""
|
|
CREATE TABLE default.test_warning_table (
|
|
s String
|
|
) ENGINE = MergeTree
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
""")
|
|
node.query("""
|
|
DROP TABLE default.test_warning_table
|
|
""")
|
|
log = get_log(node)
|
|
assert not re.search("Warning.*Volume.*special_warning_zero_volume", log)
|
|
assert not re.search("Warning.*Volume.*special_warning_default_volume", log)
|
|
assert re.search("Warning.*Volume.*special_warning_small_volume", log)
|
|
assert not re.search("Warning.*Volume.*special_warning_big_volume", log)
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("mt_on_jbod", "MergeTree()"),
|
|
("replicated_mt_on_jbod", "ReplicatedMergeTree('/clickhouse/replicated_mt_on_jbod', '1')",),
|
|
])
|
|
def test_round_robin(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
d UInt64
|
|
) ENGINE = {engine}
|
|
ORDER BY d
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
# first should go to the jbod1
|
|
node1.query("insert into {} select * from numbers(10000)".format(name))
|
|
used_disk = get_used_disks_for_table(node1, name)
|
|
assert len(used_disk) == 1, 'More than one disk used for single insert'
|
|
|
|
node1.query("insert into {} select * from numbers(10000, 10000)".format(name))
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
assert len(used_disks) == 2, 'Two disks should be used for two parts'
|
|
assert used_disks[0] != used_disks[1], "Should write to different disks"
|
|
|
|
node1.query("insert into {} select * from numbers(20000, 10000)".format(name))
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
# jbod1 -> jbod2 -> jbod1 -> jbod2 ... etc
|
|
assert len(used_disks) == 3
|
|
assert used_disks[0] != used_disks[1]
|
|
assert used_disks[2] == used_disks[0]
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("mt_with_huge_part", "MergeTree()"),
|
|
("replicated_mt_with_huge_part", "ReplicatedMergeTree('/clickhouse/replicated_mt_with_huge_part', '1')",),
|
|
])
|
|
def test_max_data_part_size(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
s1 String
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name, engine=engine))
|
|
data = [] # 10MB in total
|
|
for i in range(10):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
assert len(used_disks) == 1
|
|
assert used_disks[0] == 'external'
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("mt_with_overflow", "MergeTree()"),
|
|
("replicated_mt_with_overflow", "ReplicatedMergeTree('/clickhouse/replicated_mt_with_overflow', '1')",),
|
|
])
|
|
def test_jbod_overflow(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
s1 String
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
node1.query("SYSTEM STOP MERGES")
|
|
|
|
# small jbod size is 40MB, so lets insert 5MB batch 7 times
|
|
for i in range(7):
|
|
data = [] # 5MB in total
|
|
for i in range(5):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
assert all(disk == 'jbod1' for disk in used_disks)
|
|
|
|
# should go to the external disk (jbod is overflown)
|
|
data = [] # 10MB in total
|
|
for i in range(10):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
assert used_disks[-1] == 'external'
|
|
|
|
node1.query("SYSTEM START MERGES")
|
|
time.sleep(1)
|
|
|
|
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
|
|
time.sleep(2)
|
|
|
|
disks_for_merges = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table == '{}' AND level >= 1 and active = 1 ORDER BY modification_time".format(
|
|
name)).strip().split('\n')
|
|
|
|
assert all(disk == 'external' for disk in disks_for_merges)
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("moving_mt", "MergeTree()"),
|
|
("moving_replicated_mt", "ReplicatedMergeTree('/clickhouse/moving_replicated_mt', '1')",),
|
|
])
|
|
def test_background_move(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
s1 String
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='moving_jbod_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
for i in range(5):
|
|
data = [] # 5MB in total
|
|
for i in range(5):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
# small jbod size is 40MB, so lets insert 5MB batch 5 times
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
retry = 20
|
|
i = 0
|
|
while not sum(1 for x in used_disks if x == 'jbod1') <= 2 and i < retry:
|
|
time.sleep(0.5)
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
i += 1
|
|
|
|
assert sum(1 for x in used_disks if x == 'jbod1') <= 2
|
|
|
|
# first (oldest) part was moved to external
|
|
assert used_disks[0] == 'external'
|
|
|
|
path = node1.query(
|
|
"SELECT path_on_disk FROM system.part_log WHERE table = '{}' AND event_type='MovePart' ORDER BY event_time LIMIT 1".format(
|
|
name))
|
|
|
|
# first (oldest) part was moved to external
|
|
assert path.startswith("/external")
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("stopped_moving_mt", "MergeTree()"),
|
|
("stopped_moving_replicated_mt", "ReplicatedMergeTree('/clickhouse/stopped_moving_replicated_mt', '1')",),
|
|
])
|
|
def test_start_stop_moves(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
s1 String
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='moving_jbod_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
node1.query("INSERT INTO {} VALUES ('HELLO')".format(name))
|
|
node1.query("INSERT INTO {} VALUES ('WORLD')".format(name))
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
assert all(d == "jbod1" for d in used_disks), "All writes shoud go to jbods"
|
|
|
|
first_part = node1.query(
|
|
"SELECT name FROM system.parts WHERE table = '{}' and active = 1 ORDER BY modification_time LIMIT 1".format(
|
|
name)).strip()
|
|
|
|
node1.query("SYSTEM STOP MOVES")
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(name, first_part))
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
assert all(d == "jbod1" for d in used_disks), "Blocked moves doesn't actually move something"
|
|
|
|
node1.query("SYSTEM START MOVES")
|
|
|
|
node1.query("ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(name, first_part))
|
|
|
|
disk = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(name,
|
|
first_part)).strip()
|
|
|
|
assert disk == "external"
|
|
|
|
node1.query("TRUNCATE TABLE {}".format(name))
|
|
|
|
node1.query("SYSTEM STOP MOVES {}".format(name))
|
|
node1.query("SYSTEM STOP MERGES {}".format(name))
|
|
|
|
for i in range(5):
|
|
data = [] # 5MB in total
|
|
for i in range(5):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
# jbod size is 40MB, so lets insert 5MB batch 7 times
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
retry = 5
|
|
i = 0
|
|
while not sum(1 for x in used_disks if x == 'jbod1') <= 2 and i < retry:
|
|
time.sleep(0.1)
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
i += 1
|
|
|
|
# first (oldest) part doesn't move anywhere
|
|
assert used_disks[0] == 'jbod1'
|
|
|
|
node1.query("SYSTEM START MOVES {}".format(name))
|
|
|
|
# wait sometime until background backoff finishes
|
|
retry = 30
|
|
i = 0
|
|
while not sum(1 for x in used_disks if x == 'jbod1') <= 2 and i < retry:
|
|
time.sleep(1)
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
i += 1
|
|
|
|
node1.query("SYSTEM START MERGES {}".format(name))
|
|
|
|
assert sum(1 for x in used_disks if x == 'jbod1') <= 2
|
|
|
|
# first (oldest) part moved to external
|
|
assert used_disks[0] == 'external'
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
def get_path_for_part_from_part_log(node, table, part_name):
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
path = node.query(
|
|
"SELECT path_on_disk FROM system.part_log WHERE table = '{}' and part_name = '{}' ORDER BY event_time DESC LIMIT 1".format(
|
|
table, part_name))
|
|
return path.strip()
|
|
|
|
|
|
def get_paths_for_partition_from_part_log(node, table, partition_id):
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
paths = node.query(
|
|
"SELECT path_on_disk FROM system.part_log WHERE table = '{}' and partition_id = '{}' ORDER BY event_time DESC".format(
|
|
table, partition_id))
|
|
return paths.strip().split('\n')
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("altering_mt", "MergeTree()"),
|
|
# ("altering_replicated_mt","ReplicatedMergeTree('/clickhouse/altering_replicated_mt', '1')",),
|
|
# SYSTEM STOP MERGES doesn't disable merges assignments
|
|
])
|
|
def test_alter_move(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
EventDate Date,
|
|
number UInt64
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
PARTITION BY toYYYYMM(EventDate)
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
node1.query("SYSTEM STOP MERGES {}".format(name)) # to avoid conflicts
|
|
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 66)".format(name))
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-04-10'), 42)".format(name))
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-04-11'), 43)".format(name))
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
assert all(d.startswith("jbod") for d in used_disks), "All writes should go to jbods"
|
|
|
|
first_part = node1.query(
|
|
"SELECT name FROM system.parts WHERE table = '{}' and active = 1 ORDER BY modification_time LIMIT 1".format(
|
|
name)).strip()
|
|
|
|
time.sleep(1)
|
|
node1.query("ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(name, first_part))
|
|
disk = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(name,
|
|
first_part)).strip()
|
|
assert disk == 'external'
|
|
assert get_path_for_part_from_part_log(node1, name, first_part).startswith("/external")
|
|
|
|
time.sleep(1)
|
|
node1.query("ALTER TABLE {} MOVE PART '{}' TO DISK 'jbod1'".format(name, first_part))
|
|
disk = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(name,
|
|
first_part)).strip()
|
|
assert disk == 'jbod1'
|
|
assert get_path_for_part_from_part_log(node1, name, first_part).startswith("/jbod1")
|
|
|
|
time.sleep(1)
|
|
node1.query("ALTER TABLE {} MOVE PARTITION 201904 TO VOLUME 'external'".format(name))
|
|
disks = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201904' and active = 1".format(
|
|
name)).strip().split('\n')
|
|
assert len(disks) == 2
|
|
assert all(d == "external" for d in disks)
|
|
assert all(
|
|
path.startswith("/external") for path in get_paths_for_partition_from_part_log(node1, name, '201904')[:2])
|
|
|
|
time.sleep(1)
|
|
node1.query("ALTER TABLE {} MOVE PARTITION 201904 TO DISK 'jbod2'".format(name))
|
|
disks = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201904' and active = 1".format(
|
|
name)).strip().split('\n')
|
|
assert len(disks) == 2
|
|
assert all(d == "jbod2" for d in disks)
|
|
assert all(
|
|
path.startswith("/jbod2") for path in get_paths_for_partition_from_part_log(node1, name, '201904')[:2])
|
|
|
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "4\n"
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
@pytest.mark.parametrize("volume_or_disk", [
|
|
"DISK",
|
|
"VOLUME"
|
|
])
|
|
def test_alter_move_half_of_partition(start_cluster, volume_or_disk):
|
|
name = "alter_move_half_of_partition"
|
|
engine = "MergeTree()"
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
EventDate Date,
|
|
number UInt64
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
PARTITION BY toYYYYMM(EventDate)
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
node1.query("SYSTEM STOP MERGES {}".format(name))
|
|
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 42)".format(name))
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
assert all(d.startswith("jbod") for d in used_disks), "All writes should go to jbods"
|
|
|
|
time.sleep(1)
|
|
parts = node1.query("SELECT name FROM system.parts WHERE table = '{}' and active = 1".format(name)).splitlines()
|
|
assert len(parts) == 2
|
|
|
|
node1.query("ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(name, parts[0]))
|
|
disks = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(name, parts[
|
|
0])).splitlines()
|
|
assert disks == ["external"]
|
|
|
|
time.sleep(1)
|
|
node1.query("ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(name,
|
|
volume_or_disk=volume_or_disk))
|
|
disks = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201903' and active = 1".format(
|
|
name)).splitlines()
|
|
assert disks == ["external"] * 2
|
|
|
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "2\n"
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
@pytest.mark.parametrize("volume_or_disk", [
|
|
"DISK",
|
|
"VOLUME"
|
|
])
|
|
def test_alter_double_move_partition(start_cluster, volume_or_disk):
|
|
name = "alter_double_move_partition"
|
|
engine = "MergeTree()"
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
EventDate Date,
|
|
number UInt64
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
PARTITION BY toYYYYMM(EventDate)
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
node1.query("SYSTEM STOP MERGES {}".format(name))
|
|
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 42)".format(name))
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
assert all(d.startswith("jbod") for d in used_disks), "All writes should go to jbods"
|
|
|
|
time.sleep(1)
|
|
node1.query("ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(name,
|
|
volume_or_disk=volume_or_disk))
|
|
disks = node1.query(
|
|
"SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201903' and active = 1".format(
|
|
name)).splitlines()
|
|
assert disks == ["external"] * 2
|
|
|
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "2\n"
|
|
|
|
time.sleep(1)
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(name,
|
|
volume_or_disk=volume_or_disk))
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
def produce_alter_move(node, name):
|
|
move_type = random.choice(["PART", "PARTITION"])
|
|
if move_type == "PART":
|
|
for _ in range(10):
|
|
try:
|
|
parts = node1.query(
|
|
"SELECT name from system.parts where table = '{}' and active = 1".format(name)).strip().split('\n')
|
|
break
|
|
except QueryRuntimeException:
|
|
pass
|
|
else:
|
|
raise Exception("Cannot select from system.parts")
|
|
|
|
move_part = random.choice(["'" + part + "'" for part in parts])
|
|
else:
|
|
move_part = random.choice([201903, 201904])
|
|
|
|
move_disk = random.choice(["DISK", "VOLUME"])
|
|
if move_disk == "DISK":
|
|
move_volume = random.choice(["'external'", "'jbod1'", "'jbod2'"])
|
|
else:
|
|
move_volume = random.choice(["'main'", "'external'"])
|
|
try:
|
|
node1.query("ALTER TABLE {} MOVE {mt} {mp} TO {md} {mv}".format(
|
|
name, mt=move_type, mp=move_part, md=move_disk, mv=move_volume))
|
|
except QueryRuntimeException as ex:
|
|
pass
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("concurrently_altering_mt", "MergeTree()"),
|
|
("concurrently_altering_replicated_mt",
|
|
"ReplicatedMergeTree('/clickhouse/concurrently_altering_replicated_mt', '1')",),
|
|
])
|
|
def test_concurrent_alter_move(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
EventDate Date,
|
|
number UInt64
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
PARTITION BY toYYYYMM(EventDate)
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
values = list({random.randint(1, 1000000) for _ in range(0, 1000)})
|
|
|
|
def insert(num):
|
|
for i in range(num):
|
|
day = random.randint(11, 30)
|
|
value = values.pop()
|
|
month = '0' + str(random.choice([3, 4]))
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
def alter_move(num):
|
|
for i in range(num):
|
|
produce_alter_move(node1, name)
|
|
|
|
def alter_update(num):
|
|
for i in range(num):
|
|
node1.query("ALTER TABLE {} UPDATE number = number + 1 WHERE 1".format(name))
|
|
|
|
def optimize_table(num):
|
|
for i in range(num):
|
|
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
|
|
|
|
p = Pool(15)
|
|
tasks = []
|
|
for i in range(5):
|
|
tasks.append(p.apply_async(insert, (100,)))
|
|
tasks.append(p.apply_async(alter_move, (100,)))
|
|
tasks.append(p.apply_async(alter_update, (100,)))
|
|
tasks.append(p.apply_async(optimize_table, (100,)))
|
|
|
|
for task in tasks:
|
|
task.get(timeout=120)
|
|
|
|
assert node1.query("SELECT 1") == "1\n"
|
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n"
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("concurrently_dropping_mt", "MergeTree()"),
|
|
("concurrently_dropping_replicated_mt",
|
|
"ReplicatedMergeTree('/clickhouse/concurrently_dropping_replicated_mt', '1')",),
|
|
])
|
|
def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
EventDate Date,
|
|
number UInt64
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
PARTITION BY toYYYYMM(EventDate)
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
values = list({random.randint(1, 1000000) for _ in range(0, 1000)})
|
|
|
|
def insert(num):
|
|
for i in range(num):
|
|
day = random.randint(11, 30)
|
|
value = values.pop()
|
|
month = '0' + str(random.choice([3, 4]))
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
def alter_move(num):
|
|
for i in range(num):
|
|
produce_alter_move(node1, name)
|
|
|
|
def alter_drop(num):
|
|
for i in range(num):
|
|
partition = random.choice([201903, 201904])
|
|
drach = random.choice(["drop", "detach"])
|
|
node1.query("ALTER TABLE {} {} PARTITION {}".format(name, drach, partition))
|
|
|
|
insert(100)
|
|
p = Pool(15)
|
|
tasks = []
|
|
for i in range(5):
|
|
tasks.append(p.apply_async(insert, (100,)))
|
|
tasks.append(p.apply_async(alter_move, (100,)))
|
|
tasks.append(p.apply_async(alter_drop, (100,)))
|
|
|
|
for task in tasks:
|
|
task.get(timeout=60)
|
|
|
|
assert node1.query("SELECT 1") == "1\n"
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("detach_attach_mt", "MergeTree()"),
|
|
("replicated_detach_attach_mt", "ReplicatedMergeTree('/clickhouse/replicated_detach_attach_mt', '1')",),
|
|
])
|
|
def test_detach_attach(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
s1 String
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='moving_jbod_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
data = [] # 5MB in total
|
|
for i in range(5):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
node1.query("ALTER TABLE {} DETACH PARTITION tuple()".format(name))
|
|
assert node1.query("SELECT count() FROM {}".format(name)).strip() == "0"
|
|
|
|
assert node1.query("SELECT disk FROM system.detached_parts WHERE table = '{}'".format(name)).strip() == "jbod1"
|
|
|
|
node1.query("ALTER TABLE {} ATTACH PARTITION tuple()".format(name))
|
|
assert node1.query("SELECT count() FROM {}".format(name)).strip() == "5"
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("mutating_mt", "MergeTree()"),
|
|
("replicated_mutating_mt", "ReplicatedMergeTree('/clickhouse/replicated_mutating_mt', '1')",),
|
|
])
|
|
def test_mutate_to_another_disk(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
s1 String
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='moving_jbod_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
for i in range(5):
|
|
data = [] # 5MB in total
|
|
for i in range(5):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
node1.query("ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name))
|
|
|
|
retry = 20
|
|
while node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != "" and retry > 0:
|
|
retry -= 1
|
|
time.sleep(0.5)
|
|
|
|
if node1.query("SELECT latest_fail_reason FROM system.mutations WHERE table = '{}'".format(name)) == "":
|
|
assert node1.query("SELECT sum(endsWith(s1, 'x')) FROM {}".format(name)) == "25\n"
|
|
else: # mutation failed, let's try on another disk
|
|
print "Mutation failed"
|
|
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
|
|
node1.query("ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name))
|
|
retry = 20
|
|
while node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != "" and retry > 0:
|
|
retry -= 1
|
|
time.sleep(0.5)
|
|
|
|
assert node1.query("SELECT sum(endsWith(s1, 'x')) FROM {}".format(name)) == "25\n"
|
|
|
|
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
("alter_modifying_mt", "MergeTree()"),
|
|
("replicated_alter_modifying_mt", "ReplicatedMergeTree('/clickhouse/replicated_alter_modifying_mt', '1')",),
|
|
])
|
|
def test_concurrent_alter_modify(start_cluster, name, engine):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
EventDate Date,
|
|
number UInt64
|
|
) ENGINE = {engine}
|
|
ORDER BY tuple()
|
|
PARTITION BY toYYYYMM(EventDate)
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name, engine=engine))
|
|
|
|
values = list({random.randint(1, 1000000) for _ in range(0, 1000)})
|
|
|
|
def insert(num):
|
|
for i in range(num):
|
|
day = random.randint(11, 30)
|
|
value = values.pop()
|
|
month = '0' + str(random.choice([3, 4]))
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
def alter_move(num):
|
|
for i in range(num):
|
|
produce_alter_move(node1, name)
|
|
|
|
def alter_modify(num):
|
|
for i in range(num):
|
|
column_type = random.choice(["UInt64", "String"])
|
|
try:
|
|
node1.query("ALTER TABLE {} MODIFY COLUMN number {}".format(name, column_type))
|
|
except:
|
|
if "Replicated" not in engine:
|
|
raise
|
|
|
|
insert(100)
|
|
|
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
|
|
|
|
p = Pool(50)
|
|
tasks = []
|
|
for i in range(5):
|
|
tasks.append(p.apply_async(alter_move, (100,)))
|
|
tasks.append(p.apply_async(alter_modify, (100,)))
|
|
|
|
for task in tasks:
|
|
task.get(timeout=120)
|
|
|
|
assert node1.query("SELECT 1") == "1\n"
|
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
def test_simple_replication_and_moves(start_cluster):
|
|
try:
|
|
for i, node in enumerate([node1, node2]):
|
|
node.query("""
|
|
CREATE TABLE replicated_table_for_moves (
|
|
s1 String
|
|
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=2
|
|
""".format(i + 1))
|
|
|
|
def insert(num):
|
|
for i in range(num):
|
|
node = random.choice([node1, node2])
|
|
data = [] # 1MB in total
|
|
for i in range(2):
|
|
data.append(get_random_string(512 * 1024)) # 500KB value
|
|
node.query("INSERT INTO replicated_table_for_moves VALUES {}".format(
|
|
','.join(["('" + x + "')" for x in data])))
|
|
|
|
def optimize(num):
|
|
for i in range(num):
|
|
node = random.choice([node1, node2])
|
|
node.query("OPTIMIZE TABLE replicated_table_for_moves FINAL")
|
|
|
|
p = Pool(60)
|
|
tasks = []
|
|
tasks.append(p.apply_async(insert, (20,)))
|
|
tasks.append(p.apply_async(optimize, (20,)))
|
|
|
|
for task in tasks:
|
|
task.get(timeout=60)
|
|
|
|
node1.query("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
|
|
node2.query("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
|
|
|
|
node1.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
|
|
node2.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
|
|
|
|
data = [] # 1MB in total
|
|
for i in range(2):
|
|
data.append(get_random_string(512 * 1024)) # 500KB value
|
|
|
|
time.sleep(3) # wait until old parts will be deleted
|
|
node1.query("SYSTEM STOP MERGES")
|
|
node2.query("SYSTEM STOP MERGES")
|
|
|
|
node1.query(
|
|
"INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
|
|
node2.query(
|
|
"INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
|
|
|
|
time.sleep(3) # nothing was moved
|
|
|
|
disks1 = get_used_disks_for_table(node1, "replicated_table_for_moves")
|
|
disks2 = get_used_disks_for_table(node2, "replicated_table_for_moves")
|
|
|
|
node1.query("SYSTEM START MERGES")
|
|
node2.query("SYSTEM START MERGES")
|
|
|
|
set(disks1) == set(["jbod1", "external"])
|
|
set(disks2) == set(["jbod1", "external"])
|
|
finally:
|
|
for node in [node1, node2]:
|
|
node.query("DROP TABLE IF EXISTS replicated_table_for_moves")
|
|
|
|
|
|
def test_download_appropriate_disk(start_cluster):
|
|
try:
|
|
for i, node in enumerate([node1, node2]):
|
|
node.query("""
|
|
CREATE TABLE replicated_table_for_download (
|
|
s1 String
|
|
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_download', '{}')
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=2
|
|
""".format(i + 1))
|
|
|
|
data = []
|
|
for i in range(50):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB value
|
|
node1.query(
|
|
"INSERT INTO replicated_table_for_download VALUES {}".format(','.join(["('" + x + "')" for x in data])))
|
|
|
|
for _ in range(10):
|
|
try:
|
|
print "Syncing replica"
|
|
node2.query("SYSTEM SYNC REPLICA replicated_table_for_download")
|
|
break
|
|
except:
|
|
time.sleep(0.5)
|
|
|
|
disks2 = get_used_disks_for_table(node2, "replicated_table_for_download")
|
|
|
|
assert set(disks2) == set(["external"])
|
|
|
|
finally:
|
|
for node in [node1, node2]:
|
|
node.query("DROP TABLE IF EXISTS replicated_table_for_download")
|
|
|
|
|
|
def test_rename(start_cluster):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE default.renaming_table (
|
|
s String
|
|
) ENGINE = MergeTree
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
""")
|
|
|
|
for _ in range(5):
|
|
data = []
|
|
for i in range(10):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB value
|
|
node1.query("INSERT INTO renaming_table VALUES {}".format(','.join(["('" + x + "')" for x in data])))
|
|
|
|
disks = get_used_disks_for_table(node1, "renaming_table")
|
|
assert len(disks) > 1
|
|
assert node1.query("SELECT COUNT() FROM default.renaming_table") == "50\n"
|
|
|
|
node1.query("RENAME TABLE default.renaming_table TO default.renaming_table1")
|
|
assert node1.query("SELECT COUNT() FROM default.renaming_table1") == "50\n"
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("SELECT COUNT() FROM default.renaming_table")
|
|
|
|
node1.query("CREATE DATABASE IF NOT EXISTS test")
|
|
node1.query("RENAME TABLE default.renaming_table1 TO test.renaming_table2")
|
|
assert node1.query("SELECT COUNT() FROM test.renaming_table2") == "50\n"
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("SELECT COUNT() FROM default.renaming_table1")
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS default.renaming_table")
|
|
node1.query("DROP TABLE IF EXISTS default.renaming_table1")
|
|
node1.query("DROP TABLE IF EXISTS test.renaming_table2")
|
|
|
|
|
|
def test_freeze(start_cluster):
|
|
try:
|
|
node1.query("""
|
|
CREATE TABLE default.freezing_table (
|
|
d Date,
|
|
s String
|
|
) ENGINE = MergeTree
|
|
ORDER BY tuple()
|
|
PARTITION BY toYYYYMM(d)
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
""")
|
|
|
|
for _ in range(5):
|
|
data = []
|
|
dates = []
|
|
for i in range(10):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB value
|
|
dates.append("toDate('2019-03-05')")
|
|
node1.query("INSERT INTO freezing_table VALUES {}".format(
|
|
','.join(["(" + d + ", '" + s + "')" for d, s in zip(dates, data)])))
|
|
|
|
disks = get_used_disks_for_table(node1, "freezing_table")
|
|
assert len(disks) > 1
|
|
assert node1.query("SELECT COUNT() FROM default.freezing_table") == "50\n"
|
|
|
|
node1.query("ALTER TABLE freezing_table FREEZE PARTITION 201903")
|
|
# check shadow files (backups) exists
|
|
node1.exec_in_container(["bash", "-c", "find /jbod1/shadow -name '*.mrk2' | grep '.*'"])
|
|
node1.exec_in_container(["bash", "-c", "find /external/shadow -name '*.mrk2' | grep '.*'"])
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS default.freezing_table")
|
|
node1.exec_in_container(["rm", "-rf", "/jbod1/shadow", "/external/shadow"])
|
|
|
|
|
|
def test_kill_while_insert(start_cluster):
|
|
try:
|
|
name = "test_kill_while_insert"
|
|
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
s String
|
|
) ENGINE = MergeTree
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
""".format(name=name))
|
|
|
|
data = []
|
|
dates = []
|
|
for i in range(10):
|
|
data.append(get_random_string(1024 * 1024)) # 1MB value
|
|
node1.query("INSERT INTO {name} VALUES {}".format(','.join(["('" + s + "')" for s in data]), name=name))
|
|
|
|
disks = get_used_disks_for_table(node1, name)
|
|
assert set(disks) == {"jbod1"}
|
|
|
|
start_time = time.time()
|
|
long_select = threading.Thread(target=node1.query, args=("SELECT sleep(3) FROM {name}".format(name=name),))
|
|
long_select.start()
|
|
|
|
time.sleep(0.5)
|
|
|
|
node1.query("ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'external'".format(name=name))
|
|
assert time.time() - start_time < 2
|
|
node1.restart_clickhouse(kill=True)
|
|
|
|
try:
|
|
long_select.join()
|
|
except:
|
|
""""""
|
|
|
|
assert node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["10"]
|
|
|
|
finally:
|
|
try:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
except:
|
|
"""ClickHouse may be inactive at this moment and we don't want to mask a meaningful exception."""
|
|
|
|
|
|
def test_move_while_merge(start_cluster):
|
|
try:
|
|
name = "test_move_while_merge"
|
|
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
n Int64
|
|
) ENGINE = MergeTree
|
|
ORDER BY sleep(2)
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
""".format(name=name))
|
|
|
|
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
|
|
node1.query("INSERT INTO {name} VALUES (2)".format(name=name))
|
|
|
|
parts = node1.query(
|
|
"SELECT name FROM system.parts WHERE table = '{name}' AND active = 1".format(name=name)).splitlines()
|
|
assert len(parts) == 2
|
|
|
|
def optimize():
|
|
node1.query("OPTIMIZE TABLE {name}".format(name=name))
|
|
|
|
optimize = threading.Thread(target=optimize)
|
|
optimize.start()
|
|
|
|
time.sleep(0.5)
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
node1.query("ALTER TABLE {name} MOVE PART '{part}' TO DISK 'external'".format(name=name, part=parts[0]))
|
|
|
|
exiting = False
|
|
no_exception = {}
|
|
|
|
def alter():
|
|
while not exiting:
|
|
try:
|
|
node1.query(
|
|
"ALTER TABLE {name} MOVE PART '{part}' TO DISK 'external'".format(name=name, part=parts[0]))
|
|
no_exception['missing'] = 'exception'
|
|
break
|
|
except QueryRuntimeException:
|
|
""""""
|
|
|
|
alter_thread = threading.Thread(target=alter)
|
|
alter_thread.start()
|
|
|
|
optimize.join()
|
|
|
|
time.sleep(0.5)
|
|
|
|
exiting = True
|
|
alter_thread.join()
|
|
assert len(no_exception) == 0
|
|
|
|
assert node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["2"]
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
|
|
|
|
def test_move_across_policies_does_not_work(start_cluster):
|
|
try:
|
|
name = "test_move_across_policies_does_not_work"
|
|
|
|
node1.query("""
|
|
CREATE TABLE {name} (
|
|
n Int64
|
|
) ENGINE = MergeTree
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='jbods_with_external'
|
|
""".format(name=name))
|
|
|
|
node1.query("""
|
|
CREATE TABLE {name}2 (
|
|
n Int64
|
|
) ENGINE = MergeTree
|
|
ORDER BY tuple()
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
""".format(name=name))
|
|
|
|
node1.query("""INSERT INTO {name} VALUES (1)""".format(name=name))
|
|
node1.query("""ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'jbod2'""".format(name=name))
|
|
|
|
with pytest.raises(QueryRuntimeException, match='.*because disk does not belong to storage policy.*'):
|
|
node1.query("""ALTER TABLE {name}2 ATTACH PARTITION tuple() FROM {name}""".format(name=name))
|
|
|
|
with pytest.raises(QueryRuntimeException, match='.*because disk does not belong to storage policy.*'):
|
|
node1.query("""ALTER TABLE {name}2 REPLACE PARTITION tuple() FROM {name}""".format(name=name))
|
|
|
|
with pytest.raises(QueryRuntimeException, match='.*should have the same storage policy of source table.*'):
|
|
node1.query("""ALTER TABLE {name} MOVE PARTITION tuple() TO TABLE {name}2""".format(name=name))
|
|
|
|
assert node1.query("""SELECT * FROM {name}""".format(name=name)).splitlines() == ["1"]
|
|
|
|
finally:
|
|
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
|
node1.query("DROP TABLE IF EXISTS {name}2".format(name=name))
|