From 69900ba417637a01333326838fd58b690d0ab89e Mon Sep 17 00:00:00 2001 From: Jakub Kuklis Date: Wed, 8 Dec 2021 12:57:44 +0100 Subject: [PATCH] Zero replication support test --- .../__init__.py | 0 .../configs/config.d/storage_conf.xml | 46 ++++++++++ .../test.py | 83 +++++++++++++++++++ .../blob_storage_mocks/unstable_proxy.py | 64 -------------- .../test_merge_tree_blob_storage/test.py | 1 + 5 files changed, 130 insertions(+), 64 deletions(-) create mode 100644 tests/integration/test_blob_storage_zero_copy_replication/__init__.py create mode 100644 tests/integration/test_blob_storage_zero_copy_replication/configs/config.d/storage_conf.xml create mode 100644 tests/integration/test_blob_storage_zero_copy_replication/test.py delete mode 100644 tests/integration/test_merge_tree_blob_storage/blob_storage_mocks/unstable_proxy.py diff --git a/tests/integration/test_blob_storage_zero_copy_replication/__init__.py b/tests/integration/test_blob_storage_zero_copy_replication/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_blob_storage_zero_copy_replication/configs/config.d/storage_conf.xml b/tests/integration/test_blob_storage_zero_copy_replication/configs/config.d/storage_conf.xml new file mode 100644 index 00000000000..73eae98a80d --- /dev/null +++ b/tests/integration/test_blob_storage_zero_copy_replication/configs/config.d/storage_conf.xml @@ -0,0 +1,46 @@ + + + + + blob_storage + http://azurite1:10000/devstoreaccount1 + cont + false + + devstoreaccount1 + Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + + + + + +
+ blob_storage_disk +
+
+
+
+
+ + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + + + test_cluster + + +
diff --git a/tests/integration/test_blob_storage_zero_copy_replication/test.py b/tests/integration/test_blob_storage_zero_copy_replication/test.py new file mode 100644 index 00000000000..08fb6e53e7b --- /dev/null +++ b/tests/integration/test_blob_storage_zero_copy_replication/test.py @@ -0,0 +1,83 @@ +import logging +import pytest +from helpers.cluster import ClickHouseCluster + + +logging.getLogger().setLevel(logging.INFO) +logging.getLogger().addHandler(logging.StreamHandler()) + +NODE1 = "node1" +NODE2 = "node2" +TABLE_NAME = "blob_storage_table" +CONTAINER_NAME = "cont" +CLUSTER_NAME = "test_cluster" + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance(NODE1, main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '1'}, + with_azurite=True, + with_zookeeper=True) + cluster.add_instance(NODE2, main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '2'}, + with_azurite=True, + with_zookeeper=True) + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + yield cluster + finally: + cluster.shutdown() + + +def create_table(node, table_name, replica, **additional_settings): + settings = { + "storage_policy": "blob_storage_policy", + "old_parts_lifetime": 1, + } + settings.update(additional_settings) + + create_table_statement = f""" + CREATE TABLE {table_name} ON CLUSTER {CLUSTER_NAME} ( + id Int64, + data String + ) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{table_name}', '{{replica}}') + ORDER BY id + SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}""" + + node.query(f"DROP TABLE IF EXISTS {table_name}") + node.query(create_table_statement) + assert node.query(f"SELECT COUNT(*) FROM {table_name} FORMAT Values") == "(0)" + + +def get_large_objects_count(blob_container_client, large_size_threshold=100): + return sum(blob['size'] > large_size_threshold for blob in blob_container_client.list_blobs()) + + +def test_zero_copy_replication(cluster): + node1 = cluster.instances[NODE1] + node2 = cluster.instances[NODE2] + create_table(node1, TABLE_NAME, 1) + + blob_container_client = cluster.blob_service_client.get_container_client(CONTAINER_NAME) + + values1 = "(0,'data'),(1,'data')" + values2 = "(2,'data'),(3,'data')" + + node1.query(f"INSERT INTO {TABLE_NAME} VALUES {values1}") + node2.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}") + assert node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1 + assert node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1 + + # Based on version 21.x - should be only one file with size 100+ (checksums.txt), used by both nodes + assert get_large_objects_count(blob_container_client) == 1 + + node2.query(f"INSERT INTO {TABLE_NAME} VALUES {values2}") + node1.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}") + + assert node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1 + "," + values2 + assert node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1 + "," + values2 + + assert get_large_objects_count(blob_container_client) == 2 diff --git a/tests/integration/test_merge_tree_blob_storage/blob_storage_mocks/unstable_proxy.py b/tests/integration/test_merge_tree_blob_storage/blob_storage_mocks/unstable_proxy.py deleted file mode 100644 index 1f8fcc4bbfd..00000000000 --- a/tests/integration/test_merge_tree_blob_storage/blob_storage_mocks/unstable_proxy.py +++ /dev/null @@ -1,64 +0,0 @@ -import http.client -import http.server -import random -import socketserver -import sys -import urllib.parse - - -UPSTREAM_HOST = "minio1:9001" -random.seed("Unstable proxy/1.0") - - -def request(command, url, headers={}, data=None): - """ Mini-requests. """ - class Dummy: - pass - - parts = urllib.parse.urlparse(url) - c = http.client.HTTPConnection(parts.hostname, parts.port) - c.request(command, urllib.parse.urlunparse(parts._replace(scheme='', netloc='')), headers=headers, body=data) - r = c.getresponse() - result = Dummy() - result.status_code = r.status - result.headers = r.headers - result.content = r.read() - return result - - -class RequestHandler(http.server.BaseHTTPRequestHandler): - def do_GET(self): - if self.path == "/": - self.send_response(200) - self.send_header("Content-Type", "text/plain") - self.end_headers() - self.wfile.write(b"OK") - else: - self.do_HEAD() - - def do_PUT(self): - self.do_HEAD() - - def do_POST(self): - self.do_HEAD() - - def do_HEAD(self): - content_length = self.headers.get("Content-Length") - data = self.rfile.read(int(content_length)) if content_length else None - r = request(self.command, f"http://{UPSTREAM_HOST}{self.path}", headers=self.headers, data=data) - self.send_response(r.status_code) - for k, v in r.headers.items(): - self.send_header(k, v) - self.end_headers() - if random.random() < 0.25 and len(r.content) > 1024*1024: - r.content = r.content[:len(r.content)//2] - self.wfile.write(r.content) - self.wfile.close() - - -class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): - """Handle requests in a separate thread.""" - - -httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler) -httpd.serve_forever() diff --git a/tests/integration/test_merge_tree_blob_storage/test.py b/tests/integration/test_merge_tree_blob_storage/test.py index ac0422af55d..b6f28a0f0c1 100644 --- a/tests/integration/test_merge_tree_blob_storage/test.py +++ b/tests/integration/test_merge_tree_blob_storage/test.py @@ -67,6 +67,7 @@ def test_create_table(cluster): def test_simple_insert_select(cluster): node = cluster.instances[NODE_NAME] create_table(node, TABLE_NAME) + values = "('2021-11-13',3,'hello')" node.query(f"INSERT INTO {TABLE_NAME} VALUES {values}") assert node.query(f"SELECT dt, id, data FROM {TABLE_NAME} FORMAT Values") == values