ClickHouse/tests/integration/test_encrypted_disk_replication/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

93 lines
2.3 KiB
Python
Raw Normal View History

import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=[
"configs/cluster.xml",
"configs/disk_s3_encrypted.xml",
"configs/disk_s3_encrypted_node1.xml",
],
macros={"replica": "node1"},
with_zookeeper=True,
with_minio=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=[
"configs/cluster.xml",
"configs/disk_s3_encrypted.xml",
"configs/disk_s3_encrypted_node2.xml",
],
macros={"replica": "node2"},
with_zookeeper=True,
with_minio=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def cleanup_after_test():
try:
yield
finally:
node1.query("DROP TABLE IF EXISTS encrypted_test ON CLUSTER 'cluster' NO DELAY")
def create_table(
zero_copy_replication=False, storage_policy="s3_encrypted_policy_with_diff_keys"
):
engine = "ReplicatedMergeTree('/clickhouse/tables/encrypted_test/', '{replica}')"
settings = f"storage_policy='{storage_policy}'"
if zero_copy_replication:
settings += ", allow_remote_fs_zero_copy_replication=true"
node1.query(
f"""
CREATE TABLE encrypted_test ON CLUSTER 'cluster' (
id Int64,
data String
) ENGINE={engine}
ORDER BY id
SETTINGS {settings}
"""
)
def check_replication():
node1.query("INSERT INTO encrypted_test VALUES (0, 'a'), (1, 'b')")
node2.query("INSERT INTO encrypted_test VALUES (2, 'c'), (3, 'd')")
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' encrypted_test")
select_query = "SELECT * FROM encrypted_test ORDER BY id"
assert node1.query(select_query) == TSV([[0, "a"], [1, "b"], [2, "c"], [3, "d"]])
assert node2.query(select_query) == TSV([[0, "a"], [1, "b"], [2, "c"], [3, "d"]])
def test_replication():
create_table(
zero_copy_replication=False, storage_policy="s3_encrypted_policy_with_diff_keys"
)
check_replication()
def test_zero_copy_replication():
create_table(zero_copy_replication=True, storage_policy="s3_encrypted_policy")
check_replication()