ClickHouse/tests/integration/test_disk_configuration/test.py

299 lines
9.3 KiB
Python

import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
TABLE_NAME = "test"
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.add_instance(
"node1",
main_configs=[
"configs/config.d/storage_configuration.xml",
"configs/config.d/remote_servers.xml",
],
with_zookeeper=True,
stay_alive=True,
with_minio=True,
macros={"replica": "node1", "shard": "shard1"},
)
cluster.add_instance(
"node2",
main_configs=[
"configs/config.d/storage_configuration.xml",
"configs/config.d/remote_servers.xml",
],
with_zookeeper=True,
stay_alive=True,
with_minio=True,
macros={"replica": "node2", "shard": "shard1"},
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_merge_tree_disk_setting(start_cluster):
node1 = cluster.instances["node1"]
assert (
"MergeTree settings `storage_policy` and `disk` cannot be specified at the same time"
in node1.query_and_get_error(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS disk = 'disk_local', storage_policy = 's3';
"""
)
)
node1.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS disk = 's3';
"""
)
minio = cluster.minio_client
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
> count
)
node1.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME}_2;
CREATE TABLE {TABLE_NAME}_2 (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS disk = 's3';
"""
)
node1.query(f"INSERT INTO {TABLE_NAME}_2 SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_2")) == 100
assert (
"__s3"
in node1.query(
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}'"
).strip()
)
assert (
"__s3"
in node1.query(
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_2'"
).strip()
)
node1.query("SYSTEM RELOAD CONFIG")
assert not node1.contains_in_log(
"An error has occurred while reloading storage policies, storage policies were not applied"
)
assert (
"['s3']"
in node1.query(
"SELECT disks FROM system.storage_policies WHERE policy_name = '__s3'"
).strip()
)
node1.restart_clickhouse()
assert (
"_s3"
in node1.query(
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}'"
).strip()
)
assert (
"['s3']"
in node1.query(
"SELECT disks FROM system.storage_policies WHERE policy_name = '__s3'"
).strip()
)
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
node1.query(f"DROP TABLE {TABLE_NAME} SYNC")
node1.query(f"DROP TABLE {TABLE_NAME}_2 SYNC")
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
def test_merge_tree_custom_disk_setting(start_cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS
disk = disk(
type=s3,
endpoint='http://minio1:9001/root/data/',
access_key_id='minio',
secret_access_key='minio123');
"""
)
# Check that data was indeed created on s3 with the needed path in s3
minio = cluster.minio_client
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
> count
)
# Check that data for the second table was created on the same disk on the same path
node1.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME}_2;
CREATE TABLE {TABLE_NAME}_2 (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS
disk = disk(
type=s3,
endpoint='http://minio1:9001/root/data/',
access_key_id='minio',
secret_access_key='minio123');
"""
)
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
node1.query(f"INSERT INTO {TABLE_NAME}_2 SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_2")) == 100
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
> count
)
# Check that data for a disk with a different path was created on the different path
for obj in list(minio.list_objects(cluster.minio_bucket, "data2/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
node1.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME}_3;
CREATE TABLE {TABLE_NAME}_3 (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS
disk = disk(
type=s3,
endpoint='http://minio1:9001/root/data2/',
access_key_id='minio',
secret_access_key='minio123');
"""
)
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
node1.query(f"INSERT INTO {TABLE_NAME}_3 SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_3")) == 100
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== count
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data2/", recursive=True)))
> 0
)
# check DETACH ATTACH
node1.query(f"DETACH TABLE {TABLE_NAME}")
node1.query(f"ATTACH TABLE {TABLE_NAME}")
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 200
# check after server restart the same disk path is used with the same metadata
node1.restart_clickhouse()
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
# check reload config does not wipe custom disk
node1.query("SYSTEM RELOAD CONFIG")
assert not node1.contains_in_log(
"disappeared from configuration, this change will be applied after restart of ClickHouse"
)
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
# check replicated merge tree on cluster
replica = "{replica}"
node1.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME}_4;
CREATE TABLE {TABLE_NAME}_4 ON CLUSTER 'cluster' (a Int32)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')
ORDER BY tuple()
SETTINGS
disk = disk(
type=s3,
endpoint='http://minio1:9001/root/data2/',
access_key_id='minio',
secret_access_key='minio123');
"""
)
expected = """
SETTINGS disk = disk(type = s3, endpoint = \\'http://minio1:9001/root/data2/\\', access_key_id = \\'minio\\', secret_access_key = \\'minio123\\'), index_granularity = 8192
"""
assert expected.strip() in node1.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
assert expected.strip() in node2.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
node1.restart_clickhouse()
node2.restart_clickhouse()
assert expected.strip() in node1.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
assert expected.strip() in node2.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
# check that disk names are the same for all replicas
policy1 = node1.query(
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_4'"
).strip()
policy2 = node2.query(
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_4'"
).strip()
assert policy1 == policy2
assert (
node1.query(
f"SELECT disks FROM system.storage_policies WHERE policy_name = '{policy1}'"
).strip()
== node2.query(
f"SELECT disks FROM system.storage_policies WHERE policy_name = '{policy2}'"
).strip()
)