import pytest from helpers.cluster import ClickHouseCluster from helpers.test_tools import assert_eq_with_retry, TSV import os SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( "node1", main_configs=["configs/remote_servers.xml", "configs/encryption_codec.xml"], macros={"replica": "node1"}, with_zookeeper=True, ) node2 = cluster.add_instance( "node2", main_configs=["configs/remote_servers.xml", "configs/encryption_codec.xml"], macros={"replica": "node2"}, with_zookeeper=True, ) @pytest.fixture(scope="module", autouse=True) def start_cluster(): try: cluster.start() yield finally: cluster.shutdown() def copy_keys(instance, keys_file_name): instance.copy_file_to_container( os.path.join(SCRIPT_DIR, f"configs/{keys_file_name}.xml"), "/etc/clickhouse-server/config.d/z_keys.xml", ) instance.query("SYSTEM RELOAD CONFIG") def create_table(): node1.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' SYNC") node1.query( """ CREATE TABLE tbl ON CLUSTER 'cluster' ( id Int64, str String Codec(AES_128_GCM_SIV) ) ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}') ORDER BY id """ ) def insert_data(): node1.query("INSERT INTO tbl VALUES (1, 'str1')") node2.query("INSERT INTO tbl VALUES (1, 'str1')") # Test deduplication node2.query("INSERT INTO tbl VALUES (2, 'str2')") def optimize_table(): node1.query("OPTIMIZE TABLE tbl ON CLUSTER 'cluster' FINAL") def check_table(): expected = [[1, "str1"], [2, "str2"]] node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl") assert node1.query("SELECT * FROM tbl ORDER BY id") == TSV(expected) assert node2.query("SELECT * FROM tbl ORDER BY id") == TSV(expected) assert node1.query("CHECK TABLE tbl") == "1\n" assert node2.query("CHECK TABLE tbl") == "1\n" # Actual tests: def test_same_keys(): copy_keys(node1, "key_a") copy_keys(node2, "key_a") create_table() insert_data() check_table() optimize_table() check_table() def test_different_keys(): copy_keys(node1, "key_a") copy_keys(node2, "key_b") create_table() # Insert two blocks without duplicated blocks to force each replica to actually fetch parts from another replica. node1.query("INSERT INTO tbl VALUES (1, 'str1')") node2.query("INSERT INTO tbl VALUES (2, 'str2')") node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl") # After "SYSTEM SYNC REPLICA" we expect node1 and node2 here both having a part for (1, 'str1') encrypted with "key_a", # and a part for (2, 'str2') encrypted with "key_b". # So the command "SELECT * from tbl" must fail on both nodes because each node has only one encryption key. assert "OPENSSL_ERROR" in node1.query_and_get_error("SELECT * FROM tbl") assert "OPENSSL_ERROR" in node2.query_and_get_error("SELECT * FROM tbl") # Hang? # optimize_table() # check_table() def test_different_current_key_ids(): copy_keys(node1, "key_a_and_b_current_a") copy_keys(node2, "key_a_and_b_current_b") create_table() insert_data() check_table() optimize_table() check_table() def test_different_nonces(): copy_keys(node1, "key_a_and_nonce_x") copy_keys(node2, "key_a_and_nonce_y") create_table() insert_data() check_table() optimize_table() check_table()