mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
S3 disk unstable reads test.
This commit is contained in:
parent
0aec151719
commit
963cfc1694
@ -8,6 +8,13 @@
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
|
||||
</s3>
|
||||
<unstable_s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://resolver:8081/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_read_retries>10</s3_max_single_read_retries>
|
||||
</unstable_s3>
|
||||
<hdd>
|
||||
<type>local</type>
|
||||
<path>/</path>
|
||||
@ -24,6 +31,13 @@
|
||||
</external>
|
||||
</volumes>
|
||||
</s3>
|
||||
<unstable_s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>unstable_s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</unstable_s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
|
||||
|
@ -0,0 +1,64 @@
|
||||
import http.client
|
||||
import http.server
|
||||
import random
|
||||
import socketserver
|
||||
import sys
|
||||
import urllib.parse
|
||||
|
||||
|
||||
UPSTREAM_HOST = "minio1:9001"
|
||||
random.seed("Unstable proxy/1.0")
|
||||
|
||||
|
||||
def request(command, url, headers={}, data=None):
|
||||
""" Mini-requests. """
|
||||
class Dummy:
|
||||
pass
|
||||
|
||||
parts = urllib.parse.urlparse(url)
|
||||
c = http.client.HTTPConnection(parts.hostname, parts.port)
|
||||
c.request(command, urllib.parse.urlunparse(parts._replace(scheme='', netloc='')), headers=headers, body=data)
|
||||
r = c.getresponse()
|
||||
result = Dummy()
|
||||
result.status_code = r.status
|
||||
result.headers = r.headers
|
||||
result.content = r.read()
|
||||
return result
|
||||
|
||||
|
||||
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == "/":
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/plain")
|
||||
self.end_headers()
|
||||
self.wfile.write(b"OK")
|
||||
else:
|
||||
self.do_HEAD()
|
||||
|
||||
def do_PUT(self):
|
||||
self.do_HEAD()
|
||||
|
||||
def do_POST(self):
|
||||
self.do_HEAD()
|
||||
|
||||
def do_HEAD(self):
|
||||
content_length = self.headers.get("Content-Length")
|
||||
data = self.rfile.read(int(content_length)) if content_length else None
|
||||
r = request(self.command, f"http://{UPSTREAM_HOST}{self.path}", headers=self.headers, data=data)
|
||||
self.send_response(r.status_code)
|
||||
for k, v in r.headers.items():
|
||||
self.send_header(k, v)
|
||||
self.end_headers()
|
||||
if random.random() < 0.25 and len(r.content) > 1024*1024:
|
||||
r.content = r.content[:len(r.content)//2]
|
||||
self.wfile.write(r.content)
|
||||
self.wfile.close()
|
||||
|
||||
|
||||
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
||||
"""Handle requests in a separate thread."""
|
||||
|
||||
|
||||
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
|
||||
httpd.serve_forever()
|
@ -54,6 +54,7 @@ def cluster():
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
run_s3_mocks(cluster)
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
@ -77,11 +78,17 @@ def generate_values(date_str, count, sign=1):
|
||||
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
|
||||
|
||||
|
||||
def create_table(cluster, table_name, additional_settings=None):
|
||||
def create_table(cluster, table_name, **additional_settings):
|
||||
node = cluster.instances["node"]
|
||||
settings = {
|
||||
"storage_policy": "s3",
|
||||
"old_parts_lifetime": 0,
|
||||
"index_granularity": 512
|
||||
}
|
||||
settings.update(additional_settings)
|
||||
|
||||
create_table_statement = """
|
||||
CREATE TABLE {} (
|
||||
create_table_statement = f"""
|
||||
CREATE TABLE {table_name} (
|
||||
dt Date,
|
||||
id Int64,
|
||||
data String,
|
||||
@ -89,19 +96,40 @@ def create_table(cluster, table_name, additional_settings=None):
|
||||
) ENGINE=MergeTree()
|
||||
PARTITION BY dt
|
||||
ORDER BY (dt, id)
|
||||
SETTINGS
|
||||
storage_policy='s3',
|
||||
old_parts_lifetime=0,
|
||||
index_granularity=512
|
||||
""".format(table_name)
|
||||
|
||||
if additional_settings:
|
||||
create_table_statement += ","
|
||||
create_table_statement += additional_settings
|
||||
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
|
||||
|
||||
node.query(create_table_statement)
|
||||
|
||||
|
||||
def run_s3_mocks(cluster):
|
||||
logging.info("Starting s3 mocks")
|
||||
mocks = (
|
||||
("unstable_proxy.py", "resolver", "8081"),
|
||||
)
|
||||
for mock_filename, container, port in mocks:
|
||||
container_id = cluster.get_container_id(container)
|
||||
current_dir = os.path.dirname(__file__)
|
||||
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mocks", mock_filename), mock_filename)
|
||||
cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True)
|
||||
|
||||
# Wait for S3 mocks to start
|
||||
for mock_filename, container, port in mocks:
|
||||
num_attempts = 100
|
||||
for attempt in range(num_attempts):
|
||||
ping_response = cluster.exec_in_container(cluster.get_container_id(container),
|
||||
["curl", "-s", f"http://localhost:{port}/"], nothrow=True)
|
||||
if ping_response != "OK":
|
||||
if attempt == num_attempts - 1:
|
||||
assert ping_response == "OK", f'Expected "OK", but got "{ping_response}"'
|
||||
else:
|
||||
time.sleep(1)
|
||||
else:
|
||||
logging.debug(f"mock {mock_filename} ({port}) answered {ping_response} on attempt {attempt}")
|
||||
break
|
||||
|
||||
logging.info("S3 mocks started")
|
||||
|
||||
|
||||
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
|
||||
minio = cluster.minio_client
|
||||
while timeout > 0:
|
||||
@ -136,7 +164,7 @@ def drop_table(cluster):
|
||||
]
|
||||
)
|
||||
def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
|
||||
create_table(cluster, "s3_test", additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part))
|
||||
create_table(cluster, "s3_test", min_rows_for_wide_part=min_rows_for_wide_part)
|
||||
|
||||
node = cluster.instances["node"]
|
||||
minio = cluster.minio_client
|
||||
@ -158,13 +186,12 @@ def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
|
||||
"merge_vertical", [False, True]
|
||||
)
|
||||
def test_insert_same_partition_and_merge(cluster, merge_vertical):
|
||||
settings = None
|
||||
settings = {}
|
||||
if merge_vertical:
|
||||
settings = """
|
||||
vertical_merge_algorithm_min_rows_to_activate=0,
|
||||
vertical_merge_algorithm_min_columns_to_activate=0
|
||||
"""
|
||||
create_table(cluster, "s3_test", additional_settings=settings)
|
||||
settings['vertical_merge_algorithm_min_rows_to_activate'] = 0
|
||||
settings['vertical_merge_algorithm_min_columns_to_activate'] = 0
|
||||
|
||||
create_table(cluster, "s3_test", **settings)
|
||||
|
||||
node = cluster.instances["node"]
|
||||
minio = cluster.minio_client
|
||||
@ -459,3 +486,13 @@ def test_s3_disk_restart_during_load(cluster):
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
|
||||
def test_s3_disk_reads_on_unstable_connection(cluster):
|
||||
create_table(cluster, "s3_test", storage_policy='unstable_s3')
|
||||
node = cluster.instances["node"]
|
||||
node.query("INSERT INTO s3_test SELECT today(), *, toString(*) FROM system.numbers LIMIT 9000000")
|
||||
for i in range(30):
|
||||
print(f"Read sequence {i}")
|
||||
assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"]
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user