mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
439 lines
14 KiB
Python
439 lines
14 KiB
Python
import pytest
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
TABLE_NAME = "test"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def start_cluster():
|
|
try:
|
|
cluster.add_instance(
|
|
"node1",
|
|
main_configs=[
|
|
"configs/config.d/storage_configuration.xml",
|
|
"configs/config.d/remote_servers.xml",
|
|
],
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
with_minio=True,
|
|
macros={"replica": "node1", "shard": "shard1"},
|
|
)
|
|
cluster.add_instance(
|
|
"node2",
|
|
main_configs=[
|
|
"configs/config.d/storage_configuration.xml",
|
|
"configs/config.d/remote_servers.xml",
|
|
],
|
|
with_zookeeper=True,
|
|
stay_alive=True,
|
|
with_minio=True,
|
|
macros={"replica": "node2", "shard": "shard1"},
|
|
)
|
|
cluster.add_instance(
|
|
"node3",
|
|
main_configs=[
|
|
"configs/config.d/storage_configuration.xml",
|
|
"configs/config.d/remote_servers.xml",
|
|
"configs/config.d/mergetree_settings.xml",
|
|
],
|
|
stay_alive=True,
|
|
with_minio=True,
|
|
)
|
|
|
|
cluster.start()
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_merge_tree_disk_setting(start_cluster):
|
|
node1 = cluster.instances["node1"]
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS disk = 's3';
|
|
"""
|
|
)
|
|
|
|
minio = cluster.minio_client
|
|
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
> count
|
|
)
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME}_2;
|
|
CREATE TABLE {TABLE_NAME}_2 (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS disk = 's3';
|
|
"""
|
|
)
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME}_2 SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_2")) == 100
|
|
|
|
assert (
|
|
"__s3"
|
|
in node1.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}'"
|
|
).strip()
|
|
)
|
|
assert (
|
|
"__s3"
|
|
in node1.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_2'"
|
|
).strip()
|
|
)
|
|
|
|
node1.query("SYSTEM RELOAD CONFIG")
|
|
assert not node1.contains_in_log(
|
|
"An error has occurred while reloading storage policies, storage policies were not applied"
|
|
)
|
|
assert (
|
|
"['s3']"
|
|
in node1.query(
|
|
"SELECT disks FROM system.storage_policies WHERE policy_name = '__s3'"
|
|
).strip()
|
|
)
|
|
|
|
node1.restart_clickhouse()
|
|
|
|
assert (
|
|
"_s3"
|
|
in node1.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}'"
|
|
).strip()
|
|
)
|
|
assert (
|
|
"['s3']"
|
|
in node1.query(
|
|
"SELECT disks FROM system.storage_policies WHERE policy_name = '__s3'"
|
|
).strip()
|
|
)
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
|
|
node1.query(f"DROP TABLE {TABLE_NAME} SYNC")
|
|
node1.query(f"DROP TABLE {TABLE_NAME}_2 SYNC")
|
|
|
|
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
|
|
minio.remove_object(cluster.minio_bucket, obj.object_name)
|
|
|
|
|
|
def test_merge_tree_custom_disk_setting(start_cluster):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
|
|
# Check that data was indeed created on s3 with the needed path in s3
|
|
|
|
minio = cluster.minio_client
|
|
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
> count
|
|
)
|
|
|
|
# Check that data for the second table was created on the same disk on the same path
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME}_2;
|
|
CREATE TABLE {TABLE_NAME}_2 (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
|
|
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
node1.query(f"INSERT INTO {TABLE_NAME}_2 SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_2")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
> count
|
|
)
|
|
|
|
# Check that data for a disk with a different path was created on the different path
|
|
|
|
for obj in list(minio.list_objects(cluster.minio_bucket, "data2/", recursive=True)):
|
|
minio.remove_object(cluster.minio_bucket, obj.object_name)
|
|
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME}_3;
|
|
CREATE TABLE {TABLE_NAME}_3 (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data2/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
|
|
list1 = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
|
|
count1 = len(list1)
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME}_3 SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_3")) == 100
|
|
|
|
list2 = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
|
|
count2 = len(list2)
|
|
|
|
if count1 != count2:
|
|
print("list1: ", list1)
|
|
print("list2: ", list2)
|
|
|
|
assert count1 == count2
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data2/", recursive=True)))
|
|
> 0
|
|
)
|
|
|
|
# check DETACH ATTACH
|
|
|
|
node1.query(f"DETACH TABLE {TABLE_NAME}")
|
|
node1.query(f"ATTACH TABLE {TABLE_NAME}")
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 200
|
|
|
|
# check after server restart the same disk path is used with the same metadata
|
|
|
|
node1.restart_clickhouse()
|
|
|
|
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
|
|
|
|
# check reload config does not wipe custom disk
|
|
|
|
node1.query("SYSTEM RELOAD CONFIG")
|
|
assert not node1.contains_in_log(
|
|
"disappeared from configuration, this change will be applied after restart of ClickHouse"
|
|
)
|
|
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
|
|
|
|
# check replicated merge tree on cluster
|
|
|
|
replica = "{replica}"
|
|
node1.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME}_4;
|
|
CREATE TABLE {TABLE_NAME}_4 ON CLUSTER 'cluster' (a Int32)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
name='test_name',
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data2/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
|
|
expected = """
|
|
SETTINGS disk = disk(name = \\'test_name\\', type = s3, endpoint = \\'[HIDDEN]\\', access_key_id = \\'[HIDDEN]\\', secret_access_key = \\'[HIDDEN]\\'), index_granularity = 8192
|
|
"""
|
|
|
|
assert expected.strip() in node1.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
|
|
assert expected.strip() in node2.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
|
|
|
|
node1.restart_clickhouse()
|
|
node2.restart_clickhouse()
|
|
|
|
assert expected.strip() in node1.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
|
|
assert expected.strip() in node2.query(f"SHOW CREATE TABLE {TABLE_NAME}_4").strip()
|
|
|
|
# check that disk names are the same for all replicas
|
|
|
|
policy1 = node1.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_4'"
|
|
).strip()
|
|
|
|
policy2 = node2.query(
|
|
f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_4'"
|
|
).strip()
|
|
|
|
assert policy1 == policy2
|
|
assert (
|
|
node1.query(
|
|
f"SELECT disks FROM system.storage_policies WHERE policy_name = '{policy1}'"
|
|
).strip()
|
|
== node2.query(
|
|
f"SELECT disks FROM system.storage_policies WHERE policy_name = '{policy2}'"
|
|
).strip()
|
|
)
|
|
|
|
node1.query(f"DROP TABLE {TABLE_NAME} SYNC")
|
|
node1.query(f"DROP TABLE {TABLE_NAME}_2 SYNC")
|
|
node1.query(f"DROP TABLE {TABLE_NAME}_3 SYNC")
|
|
node1.query(f"DROP TABLE {TABLE_NAME}_4 SYNC")
|
|
node2.query(f"DROP TABLE {TABLE_NAME}_4 SYNC")
|
|
|
|
|
|
def test_merge_tree_nested_custom_disk_setting(start_cluster):
|
|
node = cluster.instances["node1"]
|
|
|
|
minio = cluster.minio_client
|
|
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
|
|
minio.remove_object(cluster.minio_bucket, obj.object_name)
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
|
|
== 0
|
|
)
|
|
|
|
node.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME} SYNC;
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree() order by tuple()
|
|
SETTINGS disk = disk(
|
|
type=cache,
|
|
max_size='1Gi',
|
|
path='/var/lib/clickhouse/custom_disk_cache/',
|
|
disk=disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123'));
|
|
"""
|
|
)
|
|
|
|
node.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
node.query("SYSTEM DROP FILESYSTEM CACHE")
|
|
|
|
# Check cache is filled
|
|
assert 0 == int(node.query("SELECT count() FROM system.filesystem_cache"))
|
|
assert 100 == int(node.query(f"SELECT count() FROM {TABLE_NAME}"))
|
|
node.query(f"SELECT * FROM {TABLE_NAME}")
|
|
assert 0 < int(node.query("SELECT count() FROM system.filesystem_cache"))
|
|
|
|
# Check s3 is filled
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))) > 0
|
|
)
|
|
|
|
node.restart_clickhouse()
|
|
|
|
assert 100 == int(node.query(f"SELECT count() FROM {TABLE_NAME}"))
|
|
|
|
expected = """
|
|
SETTINGS disk = disk(type = cache, max_size = \\'[HIDDEN]\\', path = \\'[HIDDEN]\\', disk = disk(type = s3, endpoint = \\'[HIDDEN]\\'
|
|
"""
|
|
assert expected.strip() in node.query(f"SHOW CREATE TABLE {TABLE_NAME}").strip()
|
|
node.query(f"DROP TABLE {TABLE_NAME} SYNC")
|
|
|
|
|
|
def test_merge_tree_setting_override(start_cluster):
|
|
node = cluster.instances["node3"]
|
|
assert (
|
|
"MergeTree settings `storage_policy` and `disk` cannot be specified at the same time"
|
|
in node.query_and_get_error(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS disk = 's3', storage_policy = 's3';
|
|
"""
|
|
)
|
|
)
|
|
|
|
assert "Unknown storage policy" in node.query_and_get_error(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple();
|
|
"""
|
|
)
|
|
|
|
assert "Unknown disk" in node.query_and_get_error(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS disk = 'kek';
|
|
"""
|
|
)
|
|
|
|
node.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME} SYNC;
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS
|
|
disk = disk(
|
|
type=s3,
|
|
endpoint='http://minio1:9001/root/data/',
|
|
access_key_id='minio',
|
|
secret_access_key='minio123');
|
|
"""
|
|
)
|
|
|
|
minio = cluster.minio_client
|
|
node.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))) > 0
|
|
)
|
|
|
|
node.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {TABLE_NAME} SYNC;
|
|
CREATE TABLE {TABLE_NAME} (a Int32)
|
|
ENGINE = MergeTree()
|
|
ORDER BY tuple()
|
|
SETTINGS disk = 's3'
|
|
"""
|
|
)
|
|
|
|
minio = cluster.minio_client
|
|
node.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
|
|
assert int(node.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
assert (
|
|
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))) > 0
|
|
)
|
|
node.query(f"DROP TABLE {TABLE_NAME} SYNC")
|