tests for qps_limit_exceeded

This commit is contained in:
Sema Checherinda 2024-05-22 18:35:28 +02:00
parent 0ebca16da0
commit a73d60bae5
3 changed files with 143 additions and 105 deletions

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit 2e12d7c6dafa81311ee3d73ac6a178550ffa75be Subproject commit b7ae6e5bf48fb4981f24476bdd187cd35df1e2c6

View File

@ -165,11 +165,35 @@ class _ServerRuntime:
'<?xml version="1.0" encoding="UTF-8"?>' '<?xml version="1.0" encoding="UTF-8"?>'
"<Error>" "<Error>"
"<Code>ExpectedError</Code>" "<Code>ExpectedError</Code>"
"<Message>mock s3 injected error</Message>" "<Message>mock s3 injected unretryable error</Message>"
"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>" "<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
"</Error>" "</Error>"
) )
request_handler.write_error(data) request_handler.write_error(500, data)
class SlowDownAction:
def inject_error(self, request_handler):
data = (
'<?xml version="1.0" encoding="UTF-8"?>'
"<Error>"
"<Code>SlowDown</Code>"
"<Message>Slow Down.</Message>"
"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
"</Error>"
)
request_handler.write_error(429, data)
class QpsLimitExceededAction:
def inject_error(self, request_handler):
data = (
'<?xml version="1.0" encoding="UTF-8"?>'
"<Error>"
"<Code>QpsLimitExceeded</Code>"
"<Message>Please reduce your request rate.</Message>"
"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
"</Error>"
)
request_handler.write_error(429, data)
class RedirectAction: class RedirectAction:
def __init__(self, host="localhost", port=1): def __init__(self, host="localhost", port=1):
@ -239,6 +263,10 @@ class _ServerRuntime:
self.error_handler = _ServerRuntime.BrokenPipeAction() self.error_handler = _ServerRuntime.BrokenPipeAction()
elif self.action == "redirect_to": elif self.action == "redirect_to":
self.error_handler = _ServerRuntime.RedirectAction(*self.action_args) self.error_handler = _ServerRuntime.RedirectAction(*self.action_args)
elif self.action == "slow_down":
self.error_handler = _ServerRuntime.SlowDownAction(*self.action_args)
elif self.action == "qps_limit_exceeded":
self.error_handler = _ServerRuntime.QpsLimitExceededAction(*self.action_args)
else: else:
self.error_handler = _ServerRuntime.Expected500ErrorAction() self.error_handler = _ServerRuntime.Expected500ErrorAction()
@ -344,12 +372,12 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(b"Redirected") self.wfile.write(b"Redirected")
def write_error(self, data, content_length=None): def write_error(self, http_code, data, content_length=None):
if content_length is None: if content_length is None:
content_length = len(data) content_length = len(data)
self.log_message("write_error %s", data) self.log_message("write_error %s", data)
self.read_all_input() self.read_all_input()
self.send_response(500) self.send_response(http_code)
self.send_header("Content-Type", "text/xml") self.send_header("Content-Type", "text/xml")
self.send_header("Content-Length", str(content_length)) self.send_header("Content-Length", str(content_length))
self.end_headers() self.end_headers()
@ -418,7 +446,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
path = [x for x in parts.path.split("/") if x] path = [x for x in parts.path.split("/") if x]
assert path[0] == "mock_settings", path assert path[0] == "mock_settings", path
if len(path) < 2: if len(path) < 2:
return self.write_error("_mock_settings: wrong command") return self.write_error(400, "_mock_settings: wrong command")
if path[1] == "at_part_upload": if path[1] == "at_part_upload":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False) params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
@ -477,7 +505,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.log_message("reset") self.log_message("reset")
return self._ok() return self._ok()
return self.write_error("_mock_settings: wrong command") return self.write_error(400, "_mock_settings: wrong command")
def do_GET(self): def do_GET(self):
if self.path == "/": if self.path == "/":

View File

@ -91,7 +91,7 @@ def get_multipart_counters(node, query_id, log_type="ExceptionWhileProcessing"):
SELECT SELECT
ProfileEvents['S3CreateMultipartUpload'], ProfileEvents['S3CreateMultipartUpload'],
ProfileEvents['S3UploadPart'], ProfileEvents['S3UploadPart'],
ProfileEvents['S3WriteRequestsErrors'], ProfileEvents['S3WriteRequestsErrors'] + ProfileEvents['S3WriteRequestsThrottling'],
FROM system.query_log FROM system.query_log
WHERE query_id='{query_id}' WHERE query_id='{query_id}'
AND type='{log_type}' AND type='{log_type}'
@ -148,7 +148,7 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression
) )
assert "Code: 499" in error, error assert "Code: 499" in error, error
assert "mock s3 injected error" in error, error assert "mock s3 injected unretryable error" in error, error
create_multipart, upload_parts, s3_errors = get_multipart_counters( create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id node, insert_query_id
@ -190,7 +190,7 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
) )
assert "Code: 499" in error, error assert "Code: 499" in error, error
assert "mock s3 injected error" in error, error assert "mock s3 injected unretryable error" in error, error
create_multipart, upload_parts, s3_errors = get_multipart_counters( create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id node, insert_query_id
@ -200,18 +200,28 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
assert s3_errors >= 2 assert s3_errors >= 2
def test_when_s3_connection_refused_is_retried(cluster, broken_s3): @pytest.mark.parametrize(
"action_and_message", [
("slow_down", "DB::Exception: Slow Down."),
("qps_limit_exceeded", "DB::Exception: Please reduce your request rate."),
("connection_refused", "Poco::Exception. Code: 1000, e.code() = 111, Connection refused"),
],
ids=lambda x: x[0]
)
def test_when_error_is_retried(cluster, broken_s3, action_and_message):
node = cluster.instances["node"] node = cluster.instances["node"]
broken_s3.setup_fake_multpartuploads() action, message = action_and_message
broken_s3.setup_at_part_upload(count=3, after=2, action="connection_refused")
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED" broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(count=3, after=2, action=action)
insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED"
node.query( node.query(
f""" f"""
INSERT INTO INSERT INTO
TABLE FUNCTION s3( TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_connection_refused_at_write_retried', 'http://resolver:8083/root/data/test_when_{action}_retried',
'minio', 'minio123', 'minio', 'minio123',
'CSV', auto, 'none' 'CSV', auto, 'none'
) )
@ -234,13 +244,13 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
assert upload_parts == 39 assert upload_parts == 39
assert s3_errors == 3 assert s3_errors == 3
broken_s3.setup_at_part_upload(count=1000, after=2, action="connection_refused") broken_s3.setup_at_part_upload(count=1000, after=2, action=action)
insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED_1" insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED_1"
error = node.query_and_get_error( error = node.query_and_get_error(
f""" f"""
INSERT INTO INSERT INTO
TABLE FUNCTION s3( TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_connection_refused_at_write_retried', 'http://resolver:8083/root/data/test_when_{action}_retried',
'minio', 'minio123', 'minio', 'minio123',
'CSV', auto, 'none' 'CSV', auto, 'none'
) )
@ -258,7 +268,79 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
assert "Code: 499" in error, error assert "Code: 499" in error, error
assert ( assert (
"Poco::Exception. Code: 1000, e.code() = 111, Connection refused" in error message in error
), error
def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
node = cluster.instances["node"]
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(
count=3,
after=2,
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD"
node.query(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=1000000,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert create_multipart == 1
assert upload_parts == 7
assert s3_errors == 3
broken_s3.setup_at_part_upload(
count=1000,
after=2,
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1"
error = node.query_and_get_error(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=1000000,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
assert "Code: 1000" in error, error
assert (
"DB::Exception: Poco::Exception. Code: 1000, e.code() = 32, I/O error: Broken pipe"
in error
), error ), error
@ -427,78 +509,6 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
), error ), error
def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
node = cluster.instances["node"]
broken_s3.setup_fake_multpartuploads()
broken_s3.setup_at_part_upload(
count=3,
after=2,
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD"
node.query(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=1000000,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id, log_type="QueryFinish"
)
assert create_multipart == 1
assert upload_parts == 7
assert s3_errors == 3
broken_s3.setup_at_part_upload(
count=1000,
after=2,
action="broken_pipe",
)
insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1"
error = node.query_and_get_error(
f"""
INSERT INTO
TABLE FUNCTION s3(
'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
SELECT
*
FROM system.numbers
LIMIT 1000000
SETTINGS
s3_max_single_part_upload_size=100,
s3_min_upload_part_size=1000000,
s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
assert "Code: 1000" in error, error
assert (
"DB::Exception: Poco::Exception. Code: 1000, e.code() = 32, I/O error: Broken pipe"
in error
), error
def test_query_is_canceled_with_inf_retries(cluster, broken_s3): def test_query_is_canceled_with_inf_retries(cluster, broken_s3):
node = cluster.instances["node_with_inf_s3_retries"] node = cluster.instances["node_with_inf_s3_retries"]