ClickHouse/tests/integration/test_encrypted_disk/test.py

110 lines
4.1 KiB
Python
Raw Normal View History

import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
FIRST_PART_NAME = "all_1_1_0"
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node",
main_configs=["configs/storage.xml"],
tmpfs=["/disk:size=100M"],
with_minio=True)
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.mark.parametrize("policy", ["encrypted_policy", "local_policy", "s3_policy"])
def test_encrypted_disk(cluster, policy):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(policy)
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("INSERT INTO encrypted_test VALUES (2,'data'),(3,'data')")
node.query("OPTIMIZE TABLE encrypted_test FINAL")
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
node.query("DROP TABLE IF EXISTS encrypted_test NO DELAY")
@pytest.mark.parametrize("policy,disk,encrypted_disk", [("local_policy", "disk_local", "disk_local_encrypted"), ("s3_policy", "disk_s3", "disk_s3_encrypted")])
def test_part_move(cluster, policy, disk, encrypted_disk):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(policy)
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(FIRST_PART_NAME, encrypted_disk))
assert node.query(select_query) == "(0,'data'),(1,'data')"
with pytest.raises(QueryRuntimeException) as exc:
node.query("ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(FIRST_PART_NAME, encrypted_disk))
assert("Part '{}' is already on disk '{}'".format(FIRST_PART_NAME, encrypted_disk) in str(exc.value))
node.query("ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(FIRST_PART_NAME, disk))
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("DROP TABLE IF EXISTS encrypted_test NO DELAY")
@pytest.mark.parametrize("policy,encrypted_disk", [("local_policy", "disk_local_encrypted"), ("s3_policy", "disk_s3_encrypted")])
def test_optimize_table(cluster, policy, encrypted_disk):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(policy)
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(FIRST_PART_NAME, encrypted_disk))
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("INSERT INTO encrypted_test VALUES (2,'data'),(3,'data')")
node.query("OPTIMIZE TABLE encrypted_test FINAL")
with pytest.raises(QueryRuntimeException) as exc:
node.query("ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(FIRST_PART_NAME, encrypted_disk))
assert("Part {} is not exists or not active".format(FIRST_PART_NAME) in str(exc.value))
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
node.query("DROP TABLE IF EXISTS encrypted_test NO DELAY")