Starting an integration test for Blob Storage

This commit is contained in:
Jakub Kuklis 2021-11-22 10:46:51 +01:00
parent 0c65cfed1f
commit 24bafe4740
6 changed files with 234 additions and 0 deletions

View File

@ -0,0 +1,64 @@
import http.client
import http.server
import random
import socketserver
import sys
import urllib.parse
UPSTREAM_HOST = "minio1:9001"
random.seed("Unstable proxy/1.0")
def request(command, url, headers={}, data=None):
""" Mini-requests. """
class Dummy:
pass
parts = urllib.parse.urlparse(url)
c = http.client.HTTPConnection(parts.hostname, parts.port)
c.request(command, urllib.parse.urlunparse(parts._replace(scheme='', netloc='')), headers=headers, body=data)
r = c.getresponse()
result = Dummy()
result.status_code = r.status
result.headers = r.headers
result.content = r.read()
return result
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
else:
self.do_HEAD()
def do_PUT(self):
self.do_HEAD()
def do_POST(self):
self.do_HEAD()
def do_HEAD(self):
content_length = self.headers.get("Content-Length")
data = self.rfile.read(int(content_length)) if content_length else None
r = request(self.command, f"http://{UPSTREAM_HOST}{self.path}", headers=self.headers, data=data)
self.send_response(r.status_code)
for k, v in r.headers.items():
self.send_header(k, v)
self.end_headers()
if random.random() < 0.25 and len(r.content) > 1024*1024:
r.content = r.content[:len(r.content)//2]
self.wfile.write(r.content)
self.wfile.close()
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
"""Handle requests in a separate thread."""
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -0,0 +1,5 @@
<clickhouse>
<background_processing_pool_thread_sleep_seconds>0.5</background_processing_pool_thread_sleep_seconds>
<background_processing_pool_task_sleep_seconds_when_no_work_min>0.5</background_processing_pool_task_sleep_seconds_when_no_work_min>
<background_processing_pool_task_sleep_seconds_when_no_work_max>0.5</background_processing_pool_task_sleep_seconds_when_no_work_max>
</clickhouse>

View File

@ -0,0 +1,41 @@
<yandex>
<storage_configuration>
<disks>
<my_blob_storage>
<type>blob_storage</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</my_blob_storage>
<unstable_blob_storage>
<type>blob_storage</type>
<endpoint>http://resolver:8081/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</unstable_blob_storage>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<p_blob_storage>
<volumes>
<my_blob_storage_vol>
<disk>my_blob_storage</disk>
</my_blob_storage_vol>
<external>
<disk>hdd</disk>
</external>
</volumes>
</p_blob_storage>
<unstable_blob_storage>
<volumes>
<main>
<disk>unstable_blob_storage</disk>
</main>
</volumes>
</unstable_blob_storage>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</clickhouse>

View File

@ -0,0 +1,104 @@
import logging
import os
import threading
import pytest
import time
import string
from helpers.cluster import ClickHouseCluster, get_instances_dir
class SafeThread(threading.Thread):
def __init__(self, target):
super().__init__()
self.target = target
self.exception = None
def run(self):
try:
self.target()
except Exception as e: # pylint: disable=broad-except
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node/configs/config.d/storage_conf.xml'.format(get_instances_dir()))
def run_blob_storage_mocks(cluster):
logging.info("Starting Blob Storage mocks")
mocks = (
("unstable_proxy.py", "resolver", "8081"),
)
for mock_filename, container, port in mocks:
container_id = cluster.get_container_id(container)
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "blob_storage_mocks", mock_filename), mock_filename)
cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True)
# Wait for Blob Storage mocks to start
for mock_filename, container, port in mocks:
num_attempts = 100
for attempt in range(num_attempts):
ping_response = cluster.exec_in_container(cluster.get_container_id(container),
["curl", "-s", f"http://localhost:{port}/"], nothrow=True)
if ping_response != "OK":
if attempt == num_attempts - 1:
assert ping_response == "OK", f'Expected "OK", but got "{ping_response}"'
else:
time.sleep(1)
else:
logging.debug(f"mock {mock_filename} ({port}) answered {ping_response} on attempt {attempt}")
break
logging.info("Blob Storage mocks started")
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node",
main_configs=[
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml"],
with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_blob_storage_mocks(cluster)
yield cluster
finally:
cluster.shutdown()
def create_table(node, table_name, **additional_settings):
settings = {
"storage_policy": "p_blob_storage",
"index_granularity": 512
}
settings.update(additional_settings)
create_table_statement = f"""
CREATE TABLE {table_name} (
dt Date,
id Int64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(f"DROP TABLE IF EXISTS {table_name}")
node.query(create_table_statement)
def test_simple(cluster):
node = cluster.instances["node"]
create_table(node, "blob_storage_test")
minio = cluster.minio_client
pass