mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-05 14:02:21 +00:00
299 lines
9.3 KiB
Python
299 lines
9.3 KiB
Python
import pytest
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
TABLE_NAME = "test"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def start_cluster():
|
|
try:
|
|
cluster.add_instance(
|
|
"node1",
|
|
main_configs=[
|
|
"configs/config.d/storage_configuration.xml",
|
|
"configs/config.d/remote_servers.xml",
|
|
],
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
with_minio=True,
|
|
macros={"replica": "node1", "shard": "shard1"},
|
|
)
|
|
cluster.add_instance(
|
|
"node2",
|
|
main_configs=[
|
|
"configs/config.d/storage_configuration.xml",
|
|
"configs/config.d/remote_servers.xml",
|
|
],
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
with_minio=True,
|
|
macros={"replica": "node2", "shard": "shard1"},
|
|
)
|
|
|
|
cluster.start()
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_merge_tree_disk_setting(start_cluster):
|
|
node1 = cluster.instances["node1"]
|
|
|
|
assert (
|
|
"MergeTree settings `storage_policy` and `disk` cannot be specified at the same time"
|
|
in node1.query_and_get_error(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS disk = 'disk_local', storage_policy = 's3';
|
|
"""
|
|
)
|
|
)
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS disk = 's3';
|
|
"""
|
|
)
|
|
|
|
minio = cluster.minio_client
|
|
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
> count
|
|
)
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME}_2;
|
|
CREATE TABLE {TABLE_NAME}_2 (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS disk = 's3';
|
|
"""
|
|
)
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME}_2 SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_2")) == 100
|
|
|
|
assert (
|
|
"__s3"
|
|
in node1.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}'"
|
|
).strip()
|
|
)
|
|
assert (
|
|
"__s3"
|
|
in node1.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_2'"
|
|
).strip()
|
|
)
|
|
|
|
node1.query("SYSTEM RELOAD CONFIG")
|
|
assert not node1.contains_in_log(
|
|
"An error has occurred while reloading storage policies, storage policies were not applied"
|
|
)
|
|
assert (
|
|
"['s3']"
|
|
in node1.query(
|
|
"SELECT disks FROM system.storage_policies WHERE policy_name = '__s3'"
|
|
).strip()
|
|
)
|
|
|
|
node1.restart_clickhouse()
|
|
|
|
assert (
|
|
"_s3"
|
|
in node1.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}'"
|
|
).strip()
|
|
)
|
|
assert (
|
|
"['s3']"
|
|
in node1.query(
|
|
"SELECT disks FROM system.storage_policies WHERE policy_name = '__s3'"
|
|
).strip()
|
|
)
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
|
|
node1.query(f"DROP TABLE {TABLE_NAME} SYNC")
|
|
node1.query(f"DROP TABLE {TABLE_NAME}_2 SYNC")
|
|
|
|
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
|
|
minio.remove_object(cluster.minio_bucket, obj.object_name)
|
|
|
|
|
|
def test_merge_tree_custom_disk_setting(start_cluster):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
|
|
# Check that data was indeed created on s3 with the needed path in s3
|
|
|
|
minio = cluster.minio_client
|
|
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
> count
|
|
)
|
|
|
|
# Check that data for the second table was created on the same disk on the same path
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME}_2;
|
|
CREATE TABLE {TABLE_NAME}_2 (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
|
|
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
node1.query(f"INSERT INTO {TABLE_NAME}_2 SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_2")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
> count
|
|
)
|
|
|
|
# Check that data for a disk with a different path was created on the different path
|
|
|
|
for obj in list(minio.list_objects(cluster.minio_bucket, "data2/", recursive=True)):
|
|
minio.remove_object(cluster.minio_bucket, obj.object_name)
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME}_3;
|
|
CREATE TABLE {TABLE_NAME}_3 (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data2/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
node1.query(f"INSERT INTO {TABLE_NAME}_3 SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_3")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
== count
|
|
)
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data2/", recursive=True)))
|
|
> 0
|
|
)
|
|
|
|
# check DETACH ATTACH
|
|
|
|
node1.query(f"DETACH TABLE {TABLE_NAME}")
|
|
node1.query(f"ATTACH TABLE {TABLE_NAME}")
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 200
|
|
|
|
# check after server restart the same disk path is used with the same metadata
|
|
|
|
node1.restart_clickhouse()
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
|
|
|
|
# check reload config does not wipe custom disk
|
|
|
|
node1.query("SYSTEM RELOAD CONFIG")
|
|
assert not node1.contains_in_log(
|
|
"disappeared from configuration, this change will be applied after restart of ClickHouse"
|
|
)
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
|
|
|
|
# check replicated merge tree on cluster
|
|
|
|
replica = "{replica}"
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME}_4;
|
|
CREATE TABLE {TABLE_NAME}_4 ON CLUSTER 'cluster' (a Int32)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data2/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
|
|
expected = """
|
|
SETTINGS disk = disk(type = s3, endpoint = \\'http://minio1:9001/root/data2/\\', access_key_id = \\'minio\\', secret_access_key = \\'minio123\\'), index_granularity = 8192
|
|
"""
|
|
|
|
assert expected.strip() in node1.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
|
|
assert expected.strip() in node2.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
|
|
|
|
node1.restart_clickhouse()
|
|
node2.restart_clickhouse()
|
|
|
|
assert expected.strip() in node1.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
|
|
assert expected.strip() in node2.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
|
|
|
|
# check that disk names are the same for all replicas
|
|
|
|
policy1 = node1.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_4'"
|
|
).strip()
|
|
|
|
policy2 = node2.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_4'"
|
|
).strip()
|
|
|
|
assert policy1 == policy2
|
|
assert (
|
|
node1.query(
|
|
f"SELECT disks FROM system.storage_policies WHERE policy_name = '{policy1}'"
|
|
).strip()
|
|
== node2.query(
|
|
f"SELECT disks FROM system.storage_policies WHERE policy_name = '{policy2}'"
|
|
).strip()
|
|
)
|