Merge pull request #27176 from excitoon-favorites/s3unstablereadstest

S3 disk unstable reads test
This commit is contained in:
alexey-milovidov 2021-08-23 13:27:55 +03:00 committed by GitHub
commit 983aab8818
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 19 deletions

View File

@ -8,6 +8,13 @@
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</s3>
<unstable_s3>
<type>s3</type>
<endpoint>http://resolver:8081/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_read_retries>10</s3_max_single_read_retries>
</unstable_s3>
<hdd>
<type>local</type>
<path>/</path>
@ -24,6 +31,13 @@
</external>
</volumes>
</s3>
<unstable_s3>
<volumes>
<main>
<disk>unstable_s3</disk>
</main>
</volumes>
</unstable_s3>
</policies>
</storage_configuration>

View File

@ -0,0 +1,64 @@
import http.client
import http.server
import random
import socketserver
import sys
import urllib.parse
UPSTREAM_HOST = "minio1:9001"
random.seed("Unstable proxy/1.0")
def request(command, url, headers={}, data=None):
""" Mini-requests. """
class Dummy:
pass
parts = urllib.parse.urlparse(url)
c = http.client.HTTPConnection(parts.hostname, parts.port)
c.request(command, urllib.parse.urlunparse(parts._replace(scheme='', netloc='')), headers=headers, body=data)
r = c.getresponse()
result = Dummy()
result.status_code = r.status
result.headers = r.headers
result.content = r.read()
return result
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
else:
self.do_HEAD()
def do_PUT(self):
self.do_HEAD()
def do_POST(self):
self.do_HEAD()
def do_HEAD(self):
content_length = self.headers.get("Content-Length")
data = self.rfile.read(int(content_length)) if content_length else None
r = request(self.command, f"http://{UPSTREAM_HOST}{self.path}", headers=self.headers, data=data)
self.send_response(r.status_code)
for k, v in r.headers.items():
self.send_header(k, v)
self.end_headers()
if random.random() < 0.25 and len(r.content) > 1024*1024:
r.content = r.content[:len(r.content)//2]
self.wfile.write(r.content)
self.wfile.close()
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
"""Handle requests in a separate thread."""
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -54,6 +54,7 @@ def cluster():
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_s3_mocks(cluster)
yield cluster
finally:
@ -77,11 +78,17 @@ def generate_values(date_str, count, sign=1):
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
def create_table(cluster, table_name, additional_settings=None):
def create_table(cluster, table_name, **additional_settings):
node = cluster.instances["node"]
settings = {
"storage_policy": "s3",
"old_parts_lifetime": 0,
"index_granularity": 512
}
settings.update(additional_settings)
create_table_statement = """
CREATE TABLE {} (
create_table_statement = f"""
CREATE TABLE {table_name} (
dt Date,
id Int64,
data String,
@ -89,19 +96,40 @@ def create_table(cluster, table_name, additional_settings=None):
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS
storage_policy='s3',
old_parts_lifetime=0,
index_granularity=512
""".format(table_name)
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(create_table_statement)
def run_s3_mocks(cluster):
logging.info("Starting s3 mocks")
mocks = (
("unstable_proxy.py", "resolver", "8081"),
)
for mock_filename, container, port in mocks:
container_id = cluster.get_container_id(container)
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mocks", mock_filename), mock_filename)
cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True)
# Wait for S3 mocks to start
for mock_filename, container, port in mocks:
num_attempts = 100
for attempt in range(num_attempts):
ping_response = cluster.exec_in_container(cluster.get_container_id(container),
["curl", "-s", f"http://localhost:{port}/"], nothrow=True)
if ping_response != "OK":
if attempt == num_attempts - 1:
assert ping_response == "OK", f'Expected "OK", but got "{ping_response}"'
else:
time.sleep(1)
else:
logging.debug(f"mock {mock_filename} ({port}) answered {ping_response} on attempt {attempt}")
break
logging.info("S3 mocks started")
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
@ -136,7 +164,7 @@ def drop_table(cluster):
]
)
def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
create_table(cluster, "s3_test", additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part))
create_table(cluster, "s3_test", min_rows_for_wide_part=min_rows_for_wide_part)
node = cluster.instances["node"]
minio = cluster.minio_client
@ -158,13 +186,12 @@ def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
"merge_vertical", [False, True]
)
def test_insert_same_partition_and_merge(cluster, merge_vertical):
settings = None
settings = {}
if merge_vertical:
settings = """
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0
"""
create_table(cluster, "s3_test", additional_settings=settings)
settings['vertical_merge_algorithm_min_rows_to_activate'] = 0
settings['vertical_merge_algorithm_min_columns_to_activate'] = 0
create_table(cluster, "s3_test", **settings)
node = cluster.instances["node"]
minio = cluster.minio_client
@ -459,3 +486,13 @@ def test_s3_disk_restart_during_load(cluster):
for thread in threads:
thread.join()
def test_s3_disk_reads_on_unstable_connection(cluster):
create_table(cluster, "s3_test", storage_policy='unstable_s3')
node = cluster.instances["node"]
node.query("INSERT INTO s3_test SELECT today(), *, toString(*) FROM system.numbers LIMIT 9000000")
for i in range(30):
print(f"Read sequence {i}")
assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"]