Zero replication support test

This commit is contained in:
Jakub Kuklis 2021-12-08 12:57:44 +01:00
parent b5db3a307f
commit 69900ba417
5 changed files with 130 additions and 64 deletions

View File

@ -0,0 +1,46 @@
<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>blob_storage</type>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</blob_storage_disk>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<macros>
<cluster>test_cluster</cluster>
</macros>
</clickhouse>

View File

@ -0,0 +1,83 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
NODE1 = "node1"
NODE2 = "node2"
TABLE_NAME = "blob_storage_table"
CONTAINER_NAME = "cont"
CLUSTER_NAME = "test_cluster"
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(NODE1, main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '1'},
with_azurite=True,
with_zookeeper=True)
cluster.add_instance(NODE2, main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '2'},
with_azurite=True,
with_zookeeper=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def create_table(node, table_name, replica, **additional_settings):
settings = {
"storage_policy": "blob_storage_policy",
"old_parts_lifetime": 1,
}
settings.update(additional_settings)
create_table_statement = f"""
CREATE TABLE {table_name} ON CLUSTER {CLUSTER_NAME} (
id Int64,
data String
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{table_name}', '{{replica}}')
ORDER BY id
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(f"DROP TABLE IF EXISTS {table_name}")
node.query(create_table_statement)
assert node.query(f"SELECT COUNT(*) FROM {table_name} FORMAT Values") == "(0)"
def get_large_objects_count(blob_container_client, large_size_threshold=100):
return sum(blob['size'] > large_size_threshold for blob in blob_container_client.list_blobs())
def test_zero_copy_replication(cluster):
node1 = cluster.instances[NODE1]
node2 = cluster.instances[NODE2]
create_table(node1, TABLE_NAME, 1)
blob_container_client = cluster.blob_service_client.get_container_client(CONTAINER_NAME)
values1 = "(0,'data'),(1,'data')"
values2 = "(2,'data'),(3,'data')"
node1.query(f"INSERT INTO {TABLE_NAME} VALUES {values1}")
node2.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}")
assert node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1
assert node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1
# Based on version 21.x - should be only one file with size 100+ (checksums.txt), used by both nodes
assert get_large_objects_count(blob_container_client) == 1
node2.query(f"INSERT INTO {TABLE_NAME} VALUES {values2}")
node1.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}")
assert node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1 + "," + values2
assert node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1 + "," + values2
assert get_large_objects_count(blob_container_client) == 2

View File

@ -1,64 +0,0 @@
import http.client
import http.server
import random
import socketserver
import sys
import urllib.parse
UPSTREAM_HOST = "minio1:9001"
random.seed("Unstable proxy/1.0")
def request(command, url, headers={}, data=None):
""" Mini-requests. """
class Dummy:
pass
parts = urllib.parse.urlparse(url)
c = http.client.HTTPConnection(parts.hostname, parts.port)
c.request(command, urllib.parse.urlunparse(parts._replace(scheme='', netloc='')), headers=headers, body=data)
r = c.getresponse()
result = Dummy()
result.status_code = r.status
result.headers = r.headers
result.content = r.read()
return result
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
else:
self.do_HEAD()
def do_PUT(self):
self.do_HEAD()
def do_POST(self):
self.do_HEAD()
def do_HEAD(self):
content_length = self.headers.get("Content-Length")
data = self.rfile.read(int(content_length)) if content_length else None
r = request(self.command, f"http://{UPSTREAM_HOST}{self.path}", headers=self.headers, data=data)
self.send_response(r.status_code)
for k, v in r.headers.items():
self.send_header(k, v)
self.end_headers()
if random.random() < 0.25 and len(r.content) > 1024*1024:
r.content = r.content[:len(r.content)//2]
self.wfile.write(r.content)
self.wfile.close()
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
"""Handle requests in a separate thread."""
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -67,6 +67,7 @@ def test_create_table(cluster):
def test_simple_insert_select(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
values = "('2021-11-13',3,'hello')"
node.query(f"INSERT INTO {TABLE_NAME} VALUES {values}")
assert node.query(f"SELECT dt, id, data FROM {TABLE_NAME} FORMAT Values") == values