ClickHouse/tests/integration/test_reloading_storage_configuration/test.py
2021-09-25 07:08:34 +03:00

555 lines
19 KiB
Python

import collections
import os
import re
import shutil
import time
import xml.etree.ElementTree as ET
import helpers.client
import helpers.cluster
from helpers.test_tools import TSV
import pytest
cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
stay_alive=True,
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/jbod3:size=40M', '/jbod4:size=40M',
'/external:size=200M'],
macros={"shard": 0, "replica": 1})
node2 = cluster.add_instance('node2',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
stay_alive=True,
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/jbod3:size=40M', '/jbod4:size=40M',
'/external:size=200M'],
macros={"shard": 0, "replica": 2})
def get_log(node):
return node.exec_in_container(["bash", "-c", "cat /var/log/clickhouse-server/clickhouse-server.log"])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start_over():
shutil.copy(os.path.join(os.path.dirname(__file__), "configs/config.d/storage_configuration.xml"),
os.path.join(node1.config_d_dir, "storage_configuration.xml"))
for node in (node1, node2):
separate_configuration_path = os.path.join(node.config_d_dir, "separate_configuration.xml")
try:
os.remove(separate_configuration_path)
except:
""""""
def add_disk(node, name, path, separate_file=False):
separate_configuration_path = os.path.join(node.config_d_dir, "separate_configuration.xml")
try:
if separate_file:
tree = ET.parse(separate_configuration_path)
else:
tree = ET.parse(os.path.join(node.config_d_dir, "storage_configuration.xml"))
except:
tree = ET.ElementTree(
ET.fromstring('<clickhouse><storage_configuration><disks/><policies/></storage_configuration></clickhouse>'))
root = tree.getroot()
new_disk = ET.Element(name)
new_path = ET.Element("path")
new_path.text = path
new_disk.append(new_path)
root.find("storage_configuration").find("disks").append(new_disk)
if separate_file:
tree.write(separate_configuration_path)
else:
tree.write(os.path.join(node.config_d_dir, "storage_configuration.xml"))
def update_disk(node, name, path, keep_free_space_bytes, separate_file=False):
separate_configuration_path = os.path.join(node.config_d_dir,
"separate_configuration.xml")
try:
if separate_file:
tree = ET.parse(separate_configuration_path)
else:
tree = ET.parse(
os.path.join(node.config_d_dir, "storage_configuration.xml"))
except:
tree = ET.ElementTree(
ET.fromstring('<clickhouse><storage_configuration><disks/><policies/></storage_configuration></clickhouse>'))
root = tree.getroot()
disk = root.find("storage_configuration").find("disks").find(name)
assert disk is not None
new_path = disk.find("path")
assert new_path is not None
new_path.text = path
new_keep_free_space_bytes = disk.find("keep_free_space_bytes")
assert new_keep_free_space_bytes is not None
new_keep_free_space_bytes.text = keep_free_space_bytes
if separate_file:
tree.write(separate_configuration_path)
else:
tree.write(os.path.join(node.config_d_dir, "storage_configuration.xml"))
def add_policy(node, name, volumes):
tree = ET.parse(os.path.join(node.config_d_dir, "storage_configuration.xml"))
root = tree.getroot()
new_policy = ET.Element(name)
new_volumes = ET.Element("volumes")
for volume, disks in list(volumes.items()):
new_volume = ET.Element(volume)
for disk in disks:
new_disk = ET.Element("disk")
new_disk.text = disk
new_volume.append(new_disk)
new_volumes.append(new_volume)
new_policy.append(new_volumes)
root.find("storage_configuration").find("policies").append(new_policy)
tree.write(os.path.join(node.config_d_dir, "storage_configuration.xml"))
def test_add_disk(started_cluster):
try:
name = "test_add_disk"
engine = "MergeTree()"
start_over()
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert "jbod3" not in set(node1.query("SELECT name FROM system.disks").splitlines())
add_disk(node1, "jbod3", "/jbod3/")
node1.query("SYSTEM RELOAD CONFIG")
assert "jbod3" in set(node1.query("SELECT name FROM system.disks").splitlines())
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_update_disk(started_cluster):
try:
name = "test_update_disk"
engine = "MergeTree()"
start_over()
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert node1.query("SELECT path, keep_free_space FROM system.disks where name = 'jbod2'") == TSV([
["/jbod2/", "10485760"]])
update_disk(node1, "jbod2", "/jbod2/", "20971520")
node1.query("SYSTEM RELOAD CONFIG")
assert node1.query("SELECT path, keep_free_space FROM system.disks where name = 'jbod2'") == TSV([
["/jbod2/", "20971520"]])
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_disk_to_separate_config(started_cluster):
try:
name = "test_add_disk"
engine = "MergeTree()"
start_over()
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert "jbod3" not in set(node1.query("SELECT name FROM system.disks").splitlines())
add_disk(node1, "jbod3", "/jbod3/", separate_file=True)
node1.query("SYSTEM RELOAD CONFIG")
assert "jbod3" in set(node1.query("SELECT name FROM system.disks").splitlines())
start_over()
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_policy(started_cluster):
try:
name = "test_add_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
add_policy(node1, "cool_policy", {"volume1": ["jbod3", "jbod4"]})
node1.query("SYSTEM RELOAD CONFIG")
disks = set(node1.query("SELECT name FROM system.disks").splitlines())
assert "cool_policy" in set(node1.query("SELECT policy_name FROM system.storage_policies").splitlines())
assert {"volume1"} == set(node1.query(
"SELECT volume_name FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
assert {"['jbod3','jbod4']"} == set(
node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_new_policy_works(started_cluster):
try:
name = "test_new_policy_works"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.query("SYSTEM RELOAD CONFIG")
# Incompatible storage policy.
with pytest.raises(helpers.client.QueryRuntimeException):
node1.query("""
ALTER TABLE {name} MODIFY SETTING storage_policy='cool_policy'
""".format(name=name))
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", collections.OrderedDict(
[("volume1", ["jbod3"]), ("main", ["jbod1", "jbod2"]), ("external", ["external"])]))
node1.query("SYSTEM RELOAD CONFIG")
node1.query("""
ALTER TABLE {name} MODIFY SETTING storage_policy='cool_policy'
""".format(name=name))
node1.query("""
INSERT INTO TABLE {name} VALUES (1)
""".format(name=name))
assert {"jbod3"} == set(node1.query(
"SELECT disk_name FROM system.parts WHERE active = 1 AND table = '{name}'".format(name=name)).splitlines())
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_volume_to_policy(started_cluster):
try:
name = "test_add_volume_to_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", collections.OrderedDict([("volume1", ["jbod3"]), ("volume2", ["jbod4"])]))
node1.query("SYSTEM RELOAD CONFIG")
volumes = set(node1.query(
"SELECT volume_name FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
disks_sets = set(
node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
assert {"volume1", "volume2"} == volumes
assert {"['jbod3']", "['jbod4']"} == disks_sets
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_disk_to_policy(started_cluster):
try:
name = "test_add_disk_to_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3", "jbod4"]})
node1.query("SYSTEM RELOAD CONFIG")
volumes = set(node1.query(
"SELECT volume_name FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
disks_sets = set(
node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = 'cool_policy'").splitlines())
assert {"volume1"} == volumes
assert {"['jbod3','jbod4']"} == disks_sets
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_remove_disk(started_cluster):
try:
name = "test_remove_disk"
engine = "MergeTree()"
start_over()
add_disk(node1, "remove_disk_jbod3", "/jbod3/")
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert "remove_disk_jbod3" in set(node1.query("SELECT name FROM system.disks").splitlines())
start_over()
node1.query("SYSTEM RELOAD CONFIG")
assert "remove_disk_jbod3" in set(node1.query("SELECT name FROM system.disks").splitlines())
assert re.search("Warning.*remove_disk_jbod3", get_log(node1))
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_remove_policy(started_cluster):
try:
name = "test_remove_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "remove_policy_cool_policy", {"volume1": ["jbod3", "jbod4"]})
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert "remove_policy_cool_policy" in set(
node1.query("SELECT policy_name FROM system.storage_policies").splitlines())
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
node1.query("SYSTEM RELOAD CONFIG")
assert "remove_policy_cool_policy" in set(
node1.query("SELECT policy_name FROM system.storage_policies").splitlines())
assert re.search("Error.*remove_policy_cool_policy", get_log(node1))
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_remove_volume_from_policy(started_cluster):
try:
name = "test_remove_volume_from_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "test_remove_volume_from_policy_cool_policy",
collections.OrderedDict([("volume1", ["jbod3"]), ("volume2", ["jbod4"])]))
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
volumes = set(node1.query(
"SELECT volume_name FROM system.storage_policies WHERE policy_name = 'test_remove_volume_from_policy_cool_policy'").splitlines())
disks_sets = set(node1.query(
"SELECT disks FROM system.storage_policies WHERE policy_name = 'test_remove_volume_from_policy_cool_policy'").splitlines())
assert {"volume1", "volume2"} == volumes
assert {"['jbod3']", "['jbod4']"} == disks_sets
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.query("SYSTEM RELOAD CONFIG")
volumes = set(node1.query(
"SELECT volume_name FROM system.storage_policies WHERE policy_name = 'test_remove_volume_from_policy_cool_policy'").splitlines())
disks_sets = set(node1.query(
"SELECT disks FROM system.storage_policies WHERE policy_name = 'test_remove_volume_from_policy_cool_policy'").splitlines())
assert {"volume1", "volume2"} == volumes
assert {"['jbod3']", "['jbod4']"} == disks_sets
assert re.search("Error.*test_remove_volume_from_policy_cool_policy", get_log(node1))
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_remove_disk_from_policy(started_cluster):
try:
name = "test_remove_disk_from_policy"
engine = "MergeTree()"
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "test_remove_disk_from_policy_cool_policy", {"volume1": ["jbod3", "jbod4"]})
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
volumes = set(node1.query(
"SELECT volume_name FROM system.storage_policies WHERE policy_name = 'test_remove_disk_from_policy_cool_policy'").splitlines())
disks_sets = set(node1.query(
"SELECT disks FROM system.storage_policies WHERE policy_name = 'test_remove_disk_from_policy_cool_policy'").splitlines())
assert {"volume1"} == volumes
assert {"['jbod3','jbod4']"} == disks_sets
start_over()
add_disk(node1, "jbod3", "/jbod3/")
add_disk(node1, "jbod4", "/jbod4/")
add_policy(node1, "cool_policy", {"volume1": ["jbod3"]})
node1.query("SYSTEM RELOAD CONFIG")
volumes = set(node1.query(
"SELECT volume_name FROM system.storage_policies WHERE policy_name = 'test_remove_disk_from_policy_cool_policy'").splitlines())
disks_sets = set(node1.query(
"SELECT disks FROM system.storage_policies WHERE policy_name = 'test_remove_disk_from_policy_cool_policy'").splitlines())
assert {"volume1"} == volumes
assert {"['jbod3','jbod4']"} == disks_sets
assert re.search("Error.*test_remove_disk_from_policy_cool_policy", get_log(node1))
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""