reuse s3_mocks, rewrite test test_paranoid_check_in_logs

This commit is contained in:
Sema Checherinda 2023-05-31 21:23:01 +02:00
parent aedd3afb8a
commit 0aa30ef129
6 changed files with 175 additions and 129 deletions

View File

@ -1,6 +1,7 @@
import logging
import os
import time
import importlib
# Starts simple HTTP servers written in Python.
@ -65,3 +66,28 @@ def start_mock_servers(cluster, script_dir, mocks, timeout=100):
attempt += 1
logging.info(f"Mock {server_names_with_desc} started")
# The same as start_mock_servers, but
# import servers from central directory tests/integration/helpers
# and return the control instance
def start_s3_mock(cluster, mock_name, port, timeout=100):
script_dir = os.path.join(os.path.dirname(__file__), "s3_mocks")
registered_servers = [
mock
for mock in os.listdir(script_dir)
if os.path.isfile(os.path.join(script_dir, mock))
]
file_name = mock_name + ".py"
if file_name not in registered_servers:
raise KeyError(
f"Can't run s3 mock `{mock_name}`. No file `{file_name}` in directory `{script_dir}`"
)
start_mock_servers(cluster, script_dir, [(file_name, "resolver", port)], timeout)
fmt = importlib.import_module("." + mock_name, "helpers.s3_mocks")
control = getattr(fmt, "MockControl")(cluster, "resolver", port)
return control

View File

@ -12,7 +12,75 @@ UPSTREAM_HOST = "minio1"
UPSTREAM_PORT = 9001
class ServerRuntime:
class MockControl:
def __init__(self, cluster, container, port):
self._cluster = cluster
self._container = container
self._port = port
def reset(self):
response = self._cluster.exec_in_container(
self._cluster.get_container_id(self._container),
[
"curl",
"-s",
f"http://localhost:{self._port}/mock_settings/reset",
],
nothrow=True,
)
assert response == "OK"
def setup_fail_upload(self, part_length):
response = self._cluster.exec_in_container(
self._cluster.get_container_id(self._container),
[
"curl",
"-s",
f"http://localhost:{self._port}/mock_settings/error_at_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
def setup_fake_upload(self, part_length):
response = self._cluster.exec_in_container(
self._cluster.get_container_id(self._container),
[
"curl",
"-s",
f"http://localhost:{self._port}/mock_settings/fake_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
def setup_slow_answers(
self, minimal_length=0, timeout=None, probability=None, count=None
):
url = (
f"http://localhost:{self._port}/"
f"mock_settings/slow_put"
f"?minimal_length={minimal_length}"
)
if timeout is not None:
url += f"&timeout={timeout}"
if probability is not None:
url += f"&probability={probability}"
if count is not None:
url += f"&count={count}"
response = self._cluster.exec_in_container(
self._cluster.get_container_id(self._container),
["curl", "-s", url],
nothrow=True,
)
assert response == "OK"
class _ServerRuntime:
class SlowPut:
def __init__(
self, probability_=None, timeout_=None, minimal_length_=None, count_=None
@ -34,11 +102,11 @@ class ServerRuntime:
if content_length > self.minimal_length:
if self.count > 0:
if (
runtime.slow_put.probability == 1
or random.random() <= runtime.slow_put.probability
_runtime.slow_put.probability == 1
or random.random() <= _runtime.slow_put.probability
):
self.count -= 1
return runtime.slow_put.timeout
return _runtime.slow_put.timeout
return None
def __init__(self):
@ -65,10 +133,10 @@ class ServerRuntime:
self.slow_put = None
runtime = ServerRuntime()
_runtime = _ServerRuntime()
def and_then(value, func):
def _and_then(value, func):
assert callable(func)
return None if value is None else func(value)
@ -153,28 +221,28 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
if path[1] == "error_at_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.error_at_put_when_length_bigger = int(
_runtime.error_at_put_when_length_bigger = int(
params.get("when_length_bigger", [1024 * 1024])[0]
)
return self._ok()
if path[1] == "fake_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.fake_put_when_length_bigger = int(
_runtime.fake_put_when_length_bigger = int(
params.get("when_length_bigger", [1024 * 1024])[0]
)
return self._ok()
if path[1] == "slow_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.slow_put = ServerRuntime.SlowPut(
minimal_length_=and_then(params.get("minimal_length", [None])[0], int),
probability_=and_then(params.get("probability", [None])[0], float),
timeout_=and_then(params.get("timeout", [None])[0], float),
count_=and_then(params.get("count", [None])[0], int),
_runtime.slow_put = _ServerRuntime.SlowPut(
minimal_length_=_and_then(params.get("minimal_length", [None])[0], int),
probability_=_and_then(params.get("probability", [None])[0], float),
timeout_=_and_then(params.get("timeout", [None])[0], float),
count_=_and_then(params.get("count", [None])[0], int),
)
self.log_message("set slow put %s", runtime.slow_put)
self.log_message("set slow put %s", _runtime.slow_put)
return self._ok()
if path[1] == "reset":
runtime.reset()
_runtime.reset()
return self._ok()
return self._error("_mock_settings: wrong command")
@ -191,14 +259,14 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_PUT(self):
content_length = int(self.headers.get("Content-Length", 0))
if runtime.slow_put is not None:
timeout = runtime.slow_put.get_timeout(content_length)
if _runtime.slow_put is not None:
timeout = _runtime.slow_put.get_timeout(content_length)
if timeout is not None:
self.log_message("slow put %s", timeout)
time.sleep(timeout)
if runtime.error_at_put_when_length_bigger is not None:
if content_length > runtime.error_at_put_when_length_bigger:
if _runtime.error_at_put_when_length_bigger is not None:
if content_length > _runtime.error_at_put_when_length_bigger:
return self._error(
'<?xml version="1.0" encoding="UTF-8"?>'
"<Error>"
@ -211,9 +279,10 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
parts = urllib.parse.urlsplit(self.path)
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
upload_id = params.get("uploadId", [None])[0]
if runtime.fake_put_when_length_bigger is not None and upload_id is not None:
if content_length > runtime.fake_put_when_length_bigger:
runtime.register_fake_upload(upload_id, parts.path)
if _runtime.fake_put_when_length_bigger is not None:
if content_length > _runtime.fake_put_when_length_bigger:
if upload_id is not None:
_runtime.register_fake_upload(upload_id, parts.path)
return self._fake_put_ok()
return self._redirect()
@ -223,7 +292,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
upload_id = params.get("uploadId", [None])[0]
if runtime.is_fake_upload(upload_id, parts.path):
if _runtime.is_fake_upload(upload_id, parts.path):
return self._fake_post_ok(parts.path)
return self._redirect()
@ -235,9 +304,10 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self._redirect()
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
class _ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
"""Handle requests in a separate thread."""
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()
if __name__ == "__main__":
httpd = _ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -7,25 +7,25 @@
<storage_configuration>
<disks>
<s3>
<broken_s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint>http://resolver:8083/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</broken_s3>
</disks>
<policies>
<s3>
<broken_s3>
<volumes>
<main>
<disk>s3</disk>
<disk>broken_s3</disk>
</main>
</volumes>
</s3>
</broken_s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>s3</storage_policy>
<storage_policy>broken_s3</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python3
import logging
import os
import time
import pytest
from helpers.cluster import ClickHouseCluster
import pytest
from helpers.mock_servers import start_s3_mock
@pytest.fixture(scope="module")
@ -32,12 +30,23 @@ def cluster():
cluster.shutdown()
def test_paranoid_check_in_logs(cluster):
@pytest.fixture(scope="module")
def init_broken_s3(cluster):
yield start_s3_mock(cluster, "broken_s3", "8083")
@pytest.fixture(scope="function")
def broken_s3(init_broken_s3):
init_broken_s3.reset()
yield init_broken_s3
def test_upload_after_check_works(cluster, broken_s3):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
CREATE TABLE s3_upload_after_check_works (
id Int64,
data String
) ENGINE=MergeTree()
@ -45,8 +54,12 @@ def test_paranoid_check_in_logs(cluster):
"""
)
node.query("INSERT INTO s3_failover_test VALUES (1, 'Hello')")
broken_s3.setup_fake_upload(1)
assert node.contains_in_log("exists after upload")
error = node.query_and_get_error(
"INSERT INTO s3_upload_after_check_works VALUES (1, 'Hello')"
)
assert node.query("SELECT * FROM s3_failover_test ORDER BY id") == "1\tHello\n"
assert "Code: 499" in error, error
assert "Immediately after upload" in error, error
assert "suddenly disappeared" in error, error

View File

@ -4,12 +4,11 @@ import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.mock_servers import start_mock_servers
from helpers.mock_servers import start_s3_mock, start_mock_servers
from helpers.utility import generate_values, replace_config, SafeThread
from helpers.wait_for_helpers import wait_for_delete_inactive_parts
from helpers.wait_for_helpers import wait_for_delete_empty_parts
from helpers.wait_for_helpers import wait_for_merges
from helpers.test_tools import assert_eq_with_retry
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -94,6 +93,17 @@ def create_table(node, table_name, **additional_settings):
node.query(create_table_statement)
@pytest.fixture(scope="module")
def init_broken_s3(cluster):
yield start_s3_mock(cluster, "broken_s3", "8083")
@pytest.fixture(scope="function")
def broken_s3(init_broken_s3):
init_broken_s3.reset()
yield init_broken_s3
def run_s3_mocks(cluster):
script_dir = os.path.join(os.path.dirname(__file__), "s3_mocks")
start_mock_servers(
@ -102,7 +112,6 @@ def run_s3_mocks(cluster):
[
("unstable_proxy.py", "resolver", "8081"),
("no_delete_objects.py", "resolver", "8082"),
("broken_s3.py", "resolver", "8083"),
],
)
@ -142,80 +151,6 @@ def clear_minio(cluster):
yield
class BrokenS3:
@staticmethod
def reset(cluster):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/reset",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_fail_upload(cluster, part_length):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_fake_upload(cluster, part_length):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/fake_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_slow_answers(
cluster, minimal_length=0, timeout=None, probability=None, count=None
):
url = (
f"http://localhost:8083/"
f"mock_settings/slow_put"
f"?minimal_length={minimal_length}"
)
if timeout is not None:
url += f"&timeout={timeout}"
if probability is not None:
url += f"&probability={probability}"
if count is not None:
url += f"&count={count}"
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", url],
nothrow=True,
)
assert response == "OK"
@pytest.fixture(autouse=True, scope="function")
def reset_broken_s3(cluster):
BrokenS3.reset(cluster)
yield
def check_no_objects_after_drop(cluster, table_name="s3_test", node_name="node"):
node = cluster.instances[node_name]
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
@ -932,7 +867,7 @@ def test_merge_canceled_by_drop(cluster, node_name):
@pytest.mark.parametrize("storage_policy", ["broken_s3_always_multi_part", "broken_s3"])
@pytest.mark.parametrize("node_name", ["node"])
def test_merge_canceled_by_s3_errors(cluster, node_name, storage_policy):
def test_merge_canceled_by_s3_errors(cluster, broken_s3, node_name, storage_policy):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS test_merge_canceled_by_s3_errors NO DELAY")
node.query(
@ -952,7 +887,7 @@ def test_merge_canceled_by_s3_errors(cluster, node_name, storage_policy):
min_key = node.query("SELECT min(key) FROM test_merge_canceled_by_s3_errors")
assert int(min_key) == 0, min_key
BrokenS3.setup_fail_upload(cluster, 50000)
broken_s3.setup_fail_upload(50000)
node.query("SYSTEM START MERGES test_merge_canceled_by_s3_errors")
@ -969,7 +904,7 @@ def test_merge_canceled_by_s3_errors(cluster, node_name, storage_policy):
@pytest.mark.parametrize("node_name", ["node"])
def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
def test_merge_canceled_by_s3_errors_when_move(cluster, broken_s3, node_name):
node = cluster.instances[node_name]
settings = {
"storage_policy": "external_broken_s3",
@ -995,7 +930,7 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
settings={"materialize_ttl_after_modify": 0},
)
BrokenS3.setup_fail_upload(cluster, 10000)
broken_s3.setup_fail_upload(10000)
node.query("SYSTEM START MERGES merge_canceled_by_s3_errors_when_move")
@ -1015,7 +950,9 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
@pytest.mark.parametrize(
"in_flight_memory", [(10, 245918115), (5, 156786752), (1, 106426187)]
)
def test_s3_engine_heavy_write_check_mem(cluster, node_name, in_flight_memory):
def test_s3_engine_heavy_write_check_mem(
cluster, broken_s3, node_name, in_flight_memory
):
in_flight = in_flight_memory[0]
memory = in_flight_memory[1]
@ -1029,8 +966,8 @@ def test_s3_engine_heavy_write_check_mem(cluster, node_name, in_flight_memory):
" ENGINE S3('http://resolver:8083/root/data/test-upload.csv', 'minio', 'minio123', 'CSV')",
)
BrokenS3.setup_fake_upload(cluster, 1000)
BrokenS3.setup_slow_answers(cluster, 10 * 1024 * 1024, timeout=15, count=10)
broken_s3.setup_fake_upload(1000)
broken_s3.setup_slow_answers(10 * 1024 * 1024, timeout=15, count=10)
query_id = f"INSERT_INTO_S3_ENGINE_QUERY_ID_{in_flight}"
node.query(
@ -1058,7 +995,7 @@ def test_s3_engine_heavy_write_check_mem(cluster, node_name, in_flight_memory):
@pytest.mark.parametrize("node_name", ["node"])
def test_s3_disk_heavy_write_check_mem(cluster, node_name):
def test_s3_disk_heavy_write_check_mem(cluster, broken_s3, node_name):
memory = 2279055040
node = cluster.instances[node_name]
@ -1075,8 +1012,8 @@ def test_s3_disk_heavy_write_check_mem(cluster, node_name):
)
node.query("SYSTEM STOP MERGES s3_test")
BrokenS3.setup_fake_upload(cluster, 1000)
BrokenS3.setup_slow_answers(cluster, 10 * 1024 * 1024, timeout=10, count=50)
broken_s3.setup_fake_upload(1000)
broken_s3.setup_slow_answers(10 * 1024 * 1024, timeout=10, count=50)
query_id = f"INSERT_INTO_S3_DISK_QUERY_ID"
node.query(