ClickHouse/tests/integration/test_replicated_merge_tree_encryption_codec/test.py
2024-09-27 10:19:49 +00:00

133 lines
3.5 KiB
Python

import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV, assert_eq_with_retry
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml", "configs/encryption_codec.xml"],
macros={"replica": "node1"},
with_zookeeper=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/remote_servers.xml", "configs/encryption_codec.xml"],
macros={"replica": "node2"},
with_zookeeper=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield
finally:
cluster.shutdown()
def copy_keys(instance, keys_file_name):
instance.copy_file_to_container(
os.path.join(SCRIPT_DIR, f"configs/{keys_file_name}.xml"),
"/etc/clickhouse-server/config.d/z_keys.xml",
)
instance.query("SYSTEM RELOAD CONFIG")
def create_table():
node1.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' SYNC")
node1.query(
"""
CREATE TABLE tbl ON CLUSTER 'cluster' (
id Int64,
str String Codec(AES_128_GCM_SIV)
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')
ORDER BY id
"""
)
def insert_data():
node1.query("INSERT INTO tbl VALUES (1, 'str1')")
node2.query("INSERT INTO tbl VALUES (1, 'str1')") # Test deduplication
node2.query("INSERT INTO tbl VALUES (2, 'str2')")
def optimize_table():
node1.query("OPTIMIZE TABLE tbl ON CLUSTER 'cluster' FINAL")
def check_table():
expected = [[1, "str1"], [2, "str2"]]
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
assert node1.query("SELECT * FROM tbl ORDER BY id") == TSV(expected)
assert node2.query("SELECT * FROM tbl ORDER BY id") == TSV(expected)
assert node1.query("CHECK TABLE tbl") == "1\n"
assert node2.query("CHECK TABLE tbl") == "1\n"
# Actual tests:
def test_same_keys():
copy_keys(node1, "key_a")
copy_keys(node2, "key_a")
create_table()
insert_data()
check_table()
optimize_table()
check_table()
def test_different_keys():
copy_keys(node1, "key_a")
copy_keys(node2, "key_b")
create_table()
# Insert two blocks without duplicated blocks to force each replica to actually fetch parts from another replica.
node1.query("INSERT INTO tbl VALUES (1, 'str1')")
node2.query("INSERT INTO tbl VALUES (2, 'str2')")
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
# After "SYSTEM SYNC REPLICA" we expect node1 and node2 here both having a part for (1, 'str1') encrypted with "key_a",
# and a part for (2, 'str2') encrypted with "key_b".
# So the command "SELECT * from tbl" must fail on both nodes because each node has only one encryption key.
assert "OPENSSL_ERROR" in node1.query_and_get_error("SELECT * FROM tbl")
assert "OPENSSL_ERROR" in node2.query_and_get_error("SELECT * FROM tbl")
# Hang?
# optimize_table()
# check_table()
def test_different_current_key_ids():
copy_keys(node1, "key_a_and_b_current_a")
copy_keys(node2, "key_a_and_b_current_b")
create_table()
insert_data()
check_table()
optimize_table()
check_table()
def test_different_nonces():
copy_keys(node1, "key_a_and_nonce_x")
copy_keys(node2, "key_a_and_nonce_y")
create_table()
insert_data()
check_table()
optimize_table()
check_table()