import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
from helpers.test_tools import assert_eq_with_retry
FIRST_PART_NAME = "all_1_1_0"
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/storage.xml"],
tmpfs=["/disk:size=100M"],
with_minio=True,
stay_alive=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def cleanup_after_test():
try:
yield
finally:
node.query("DROP TABLE IF EXISTS encrypted_test NO DELAY")
@pytest.mark.parametrize(
"policy",
["encrypted_policy", "encrypted_policy_key192b", "local_policy", "s3_policy"],
)
def test_encrypted_disk(policy):
node.query(
"""
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
policy
)
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("INSERT INTO encrypted_test VALUES (2,'data'),(3,'data')")
node.query("OPTIMIZE TABLE encrypted_test FINAL")
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
@pytest.mark.parametrize(
"policy, destination_disks",
[
(
"local_policy",
[
"disk_local_encrypted",
"disk_local_encrypted2",
"disk_local_encrypted_key192b",
"disk_local",
],
),
("s3_policy", ["disk_s3_encrypted", "disk_s3"]),
],
)
def test_part_move(policy, destination_disks):
node.query(
"""
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
policy
)
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
for destination_disk in destination_disks:
node.query(
"ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(
FIRST_PART_NAME, destination_disk
)
)
assert node.query(select_query) == "(0,'data'),(1,'data')"
with pytest.raises(QueryRuntimeException) as exc:
node.query(
"ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(
FIRST_PART_NAME, destination_disk
)
)
assert "Part '{}' is already on disk '{}'".format(
FIRST_PART_NAME, destination_disk
) in str(exc.value)
assert node.query(select_query) == "(0,'data'),(1,'data')"
@pytest.mark.parametrize(
"policy,encrypted_disk",
[("local_policy", "disk_local_encrypted"), ("s3_policy", "disk_s3_encrypted")],
)
def test_optimize_table(policy, encrypted_disk):
node.query(
"""
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
policy
)
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query(
"ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(
FIRST_PART_NAME, encrypted_disk
)
)
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("INSERT INTO encrypted_test VALUES (2,'data'),(3,'data')")
node.query("OPTIMIZE TABLE encrypted_test FINAL")
with pytest.raises(QueryRuntimeException) as exc:
node.query(
"ALTER TABLE encrypted_test MOVE PART '{}' TO DISK '{}'".format(
FIRST_PART_NAME, encrypted_disk
)
)
assert "Part {} is not exists or not active".format(FIRST_PART_NAME) in str(
exc.value
)
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
# Test adding encryption key on the fly.
def test_add_key():
def make_storage_policy_with_keys(policy_name, keys):
node.exec_in_container(
[
"bash",
"-c",
"""cat > /etc/clickhouse-server/config.d/storage_policy_{policy_name}.xml << EOF
<{policy_name}_disk>
encrypted
disk_local
{policy_name}_dir/
{keys}
{policy_name}_disk>
<{policy_name}>
{policy_name}_disk
{policy_name}>
EOF""".format(
policy_name=policy_name, keys=keys
),
]
)
node.query("SYSTEM RELOAD CONFIG")
# Add some data to an encrypted disk.
node.query("SELECT policy_name FROM system.storage_policies")
make_storage_policy_with_keys(
"encrypted_policy_multikeys", "firstfirstfirstf"
)
assert_eq_with_retry(
node,
"SELECT policy_name FROM system.storage_policies WHERE policy_name='encrypted_policy_multikeys'",
"encrypted_policy_multikeys",
)
node.query(
"""
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='encrypted_policy_multikeys'
"""
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
# Add a second key and start using it.
make_storage_policy_with_keys(
"encrypted_policy_multikeys",
"""
firstfirstfirstf
secondsecondseco
1
""",
)
node.query("INSERT INTO encrypted_test VALUES (2,'data'),(3,'data')")
# Now "(0,'data'),(1,'data')" is encrypted with the first key and "(2,'data'),(3,'data')" is encrypted with the second key.
# All data are accessible.
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
# Try to replace the first key with something wrong, and check that "(0,'data'),(1,'data')" cannot be read.
make_storage_policy_with_keys(
"encrypted_policy_multikeys",
"""
wrongwrongwrongw
secondsecondseco
1
""",
)
expected_error = "Wrong key"
assert expected_error in node.query_and_get_error(select_query)
# Detach the part encrypted with the wrong key and check that another part containing "(2,'data'),(3,'data')" still can be read.
node.query("ALTER TABLE encrypted_test DETACH PART '{}'".format(FIRST_PART_NAME))
assert node.query(select_query) == "(2,'data'),(3,'data')"
def test_read_in_order():
node.query(
"CREATE TABLE encrypted_test(`a` UInt64, `b` String(150)) ENGINE = MergeTree() ORDER BY (a, b) SETTINGS storage_policy='encrypted_policy'"
)
node.query(
"INSERT INTO encrypted_test SELECT * FROM generateRandom('a UInt64, b FixedString(150)') LIMIT 100000"
)
node.query(
"SELECT * FROM encrypted_test ORDER BY a, b SETTINGS optimize_read_in_order=1 FORMAT Null"
)
node.query(
"SELECT * FROM encrypted_test ORDER BY a, b SETTINGS optimize_read_in_order=0 FORMAT Null"
)
def test_restart():
for policy in ["disk_s3_encrypted_default_path", "encrypted_s3_cache"]:
node.query(
f"""
DROP TABLE IF EXISTS encrypted_test;
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS disk='{policy}'
"""
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.restart_clickhouse()
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("DROP TABLE encrypted_test NO DELAY;")