diff --git a/contrib/aws b/contrib/aws
index 2e12d7c6daf..eb96e740453 160000
--- a/contrib/aws
+++ b/contrib/aws
@@ -1 +1 @@
-Subproject commit 2e12d7c6dafa81311ee3d73ac6a178550ffa75be
+Subproject commit eb96e740453ae27afa1f367ba19f99bdcb38484d
diff --git a/tests/integration/helpers/s3_mocks/broken_s3.py b/tests/integration/helpers/s3_mocks/broken_s3.py
index 206f960293f..7d0127bc1c4 100644
--- a/tests/integration/helpers/s3_mocks/broken_s3.py
+++ b/tests/integration/helpers/s3_mocks/broken_s3.py
@@ -165,11 +165,35 @@ class _ServerRuntime:
''
""
"ExpectedError
"
- "mock s3 injected error"
+ "mock s3 injected unretryable error"
"txfbd566d03042474888193-00608d7537"
""
)
- request_handler.write_error(data)
+ request_handler.write_error(500, data)
+
+ class SlowDownAction:
+ def inject_error(self, request_handler):
+ data = (
+ ''
+ ""
+ "SlowDown
"
+ "Slow Down."
+ "txfbd566d03042474888193-00608d7537"
+ ""
+ )
+ request_handler.write_error(429, data)
+
+ class QpsLimitExceededAction:
+ def inject_error(self, request_handler):
+ data = (
+ ''
+ ""
+ "QpsLimitExceeded
"
+ "Please reduce your request rate."
+ "txfbd566d03042474888193-00608d7537"
+ ""
+ )
+ request_handler.write_error(429, data)
class RedirectAction:
def __init__(self, host="localhost", port=1):
@@ -239,6 +263,12 @@ class _ServerRuntime:
self.error_handler = _ServerRuntime.BrokenPipeAction()
elif self.action == "redirect_to":
self.error_handler = _ServerRuntime.RedirectAction(*self.action_args)
+ elif self.action == "slow_down":
+ self.error_handler = _ServerRuntime.SlowDownAction(*self.action_args)
+ elif self.action == "qps_limit_exceeded":
+ self.error_handler = _ServerRuntime.QpsLimitExceededAction(
+ *self.action_args
+ )
else:
self.error_handler = _ServerRuntime.Expected500ErrorAction()
@@ -344,12 +374,12 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(b"Redirected")
- def write_error(self, data, content_length=None):
+ def write_error(self, http_code, data, content_length=None):
if content_length is None:
content_length = len(data)
self.log_message("write_error %s", data)
self.read_all_input()
- self.send_response(500)
+ self.send_response(http_code)
self.send_header("Content-Type", "text/xml")
self.send_header("Content-Length", str(content_length))
self.end_headers()
@@ -418,7 +448,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
path = [x for x in parts.path.split("/") if x]
assert path[0] == "mock_settings", path
if len(path) < 2:
- return self.write_error("_mock_settings: wrong command")
+ return self.write_error(400, "_mock_settings: wrong command")
if path[1] == "at_part_upload":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
@@ -477,7 +507,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.log_message("reset")
return self._ok()
- return self.write_error("_mock_settings: wrong command")
+ return self.write_error(400, "_mock_settings: wrong command")
def do_GET(self):
if self.path == "/":
diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py
index 22d6d263d23..a7fe02b16de 100644
--- a/tests/integration/test_checking_s3_blobs_paranoid/test.py
+++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py
@@ -91,7 +91,7 @@ def get_multipart_counters(node, query_id, log_type="ExceptionWhileProcessing"):
SELECT
ProfileEvents['S3CreateMultipartUpload'],
ProfileEvents['S3UploadPart'],
- ProfileEvents['S3WriteRequestsErrors'],
+ ProfileEvents['S3WriteRequestsErrors'] + ProfileEvents['S3WriteRequestsThrottling'],
FROM system.query_log
WHERE query_id='{query_id}'
AND type='{log_type}'
@@ -148,7 +148,7 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression
)
assert "Code: 499" in error, error
- assert "mock s3 injected error" in error, error
+ assert "mock s3 injected unretryable error" in error, error
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id
@@ -190,7 +190,7 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
)
assert "Code: 499" in error, error
- assert "mock s3 injected error" in error, error
+ assert "mock s3 injected unretryable error" in error, error
create_multipart, upload_parts, s3_errors = get_multipart_counters(
node, insert_query_id
@@ -200,18 +200,32 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload(
assert s3_errors >= 2
-def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
+@pytest.mark.parametrize(
+ "action_and_message",
+ [
+ ("slow_down", "DB::Exception: Slow Down."),
+ ("qps_limit_exceeded", "DB::Exception: Please reduce your request rate."),
+ (
+ "connection_refused",
+ "Poco::Exception. Code: 1000, e.code() = 111, Connection refused",
+ ),
+ ],
+ ids=lambda x: x[0],
+)
+def test_when_error_is_retried(cluster, broken_s3, action_and_message):
node = cluster.instances["node"]
- broken_s3.setup_fake_multpartuploads()
- broken_s3.setup_at_part_upload(count=3, after=2, action="connection_refused")
+ action, message = action_and_message
- insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED"
+ broken_s3.setup_fake_multpartuploads()
+ broken_s3.setup_at_part_upload(count=3, after=2, action=action)
+
+ insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED"
node.query(
f"""
INSERT INTO
TABLE FUNCTION s3(
- 'http://resolver:8083/root/data/test_when_s3_connection_refused_at_write_retried',
+ 'http://resolver:8083/root/data/test_when_{action}_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
@@ -234,13 +248,13 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
assert upload_parts == 39
assert s3_errors == 3
- broken_s3.setup_at_part_upload(count=1000, after=2, action="connection_refused")
- insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED_1"
+ broken_s3.setup_at_part_upload(count=1000, after=2, action=action)
+ insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED_1"
error = node.query_and_get_error(
f"""
INSERT INTO
TABLE FUNCTION s3(
- 'http://resolver:8083/root/data/test_when_s3_connection_refused_at_write_retried',
+ 'http://resolver:8083/root/data/test_when_{action}_retried',
'minio', 'minio123',
'CSV', auto, 'none'
)
@@ -257,8 +271,78 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3):
)
assert "Code: 499" in error, error
+ assert message in error, error
+
+
+def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
+ node = cluster.instances["node"]
+
+ broken_s3.setup_fake_multpartuploads()
+ broken_s3.setup_at_part_upload(
+ count=3,
+ after=2,
+ action="broken_pipe",
+ )
+
+ insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD"
+ node.query(
+ f"""
+ INSERT INTO
+ TABLE FUNCTION s3(
+ 'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
+ 'minio', 'minio123',
+ 'CSV', auto, 'none'
+ )
+ SELECT
+ *
+ FROM system.numbers
+ LIMIT 1000000
+ SETTINGS
+ s3_max_single_part_upload_size=100,
+ s3_min_upload_part_size=1000000,
+ s3_check_objects_after_upload=0
+ """,
+ query_id=insert_query_id,
+ )
+
+ create_multipart, upload_parts, s3_errors = get_multipart_counters(
+ node, insert_query_id, log_type="QueryFinish"
+ )
+
+ assert create_multipart == 1
+ assert upload_parts == 7
+ assert s3_errors == 3
+
+ broken_s3.setup_at_part_upload(
+ count=1000,
+ after=2,
+ action="broken_pipe",
+ )
+ insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1"
+ error = node.query_and_get_error(
+ f"""
+ INSERT INTO
+ TABLE FUNCTION s3(
+ 'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
+ 'minio', 'minio123',
+ 'CSV', auto, 'none'
+ )
+ SELECT
+ *
+ FROM system.numbers
+ LIMIT 1000000
+ SETTINGS
+ s3_max_single_part_upload_size=100,
+ s3_min_upload_part_size=1000000,
+ s3_check_objects_after_upload=0
+ """,
+ query_id=insert_query_id,
+ )
+
+ assert "Code: 1000" in error, error
assert (
- "Poco::Exception. Code: 1000, e.code() = 111, Connection refused" in error
+ "DB::Exception: Poco::Exception. Code: 1000, e.code() = 32, I/O error: Broken pipe"
+ in error
), error
@@ -401,20 +485,20 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
)
error = node.query_and_get_error(
f"""
- INSERT INTO
- TABLE FUNCTION s3(
- 'http://resolver:8083/root/data/test_when_s3_connection_reset_by_peer_at_create_mpu_retried',
- 'minio', 'minio123',
- 'CSV', auto, 'none'
- )
- SELECT
- *
- FROM system.numbers
- LIMIT 1000
- SETTINGS
- s3_max_single_part_upload_size=100,
- s3_min_upload_part_size=100,
- s3_check_objects_after_upload=0
+ INSERT INTO
+ TABLE FUNCTION s3(
+ 'http://resolver:8083/root/data/test_when_s3_connection_reset_by_peer_at_create_mpu_retried',
+ 'minio', 'minio123',
+ 'CSV', auto, 'none'
+ )
+ SELECT
+ *
+ FROM system.numbers
+ LIMIT 1000
+ SETTINGS
+ s3_max_single_part_upload_size=100,
+ s3_min_upload_part_size=100,
+ s3_check_objects_after_upload=0
""",
query_id=insert_query_id,
)
@@ -427,78 +511,6 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
), error
-def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3):
- node = cluster.instances["node"]
-
- broken_s3.setup_fake_multpartuploads()
- broken_s3.setup_at_part_upload(
- count=3,
- after=2,
- action="broken_pipe",
- )
-
- insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD"
- node.query(
- f"""
- INSERT INTO
- TABLE FUNCTION s3(
- 'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
- 'minio', 'minio123',
- 'CSV', auto, 'none'
- )
- SELECT
- *
- FROM system.numbers
- LIMIT 1000000
- SETTINGS
- s3_max_single_part_upload_size=100,
- s3_min_upload_part_size=1000000,
- s3_check_objects_after_upload=0
- """,
- query_id=insert_query_id,
- )
-
- create_multipart, upload_parts, s3_errors = get_multipart_counters(
- node, insert_query_id, log_type="QueryFinish"
- )
-
- assert create_multipart == 1
- assert upload_parts == 7
- assert s3_errors == 3
-
- broken_s3.setup_at_part_upload(
- count=1000,
- after=2,
- action="broken_pipe",
- )
- insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1"
- error = node.query_and_get_error(
- f"""
- INSERT INTO
- TABLE FUNCTION s3(
- 'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried',
- 'minio', 'minio123',
- 'CSV', auto, 'none'
- )
- SELECT
- *
- FROM system.numbers
- LIMIT 1000000
- SETTINGS
- s3_max_single_part_upload_size=100,
- s3_min_upload_part_size=1000000,
- s3_check_objects_after_upload=0
- """,
- query_id=insert_query_id,
- )
-
- assert "Code: 1000" in error, error
- assert (
- "DB::Exception: Poco::Exception. Code: 1000, e.code() = 32, I/O error: Broken pipe"
- in error
- ), error
-
-
def test_query_is_canceled_with_inf_retries(cluster, broken_s3):
node = cluster.instances["node_with_inf_s3_retries"]
diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py
index 9216b08f942..0bf81e81383 100644
--- a/tests/integration/test_merge_tree_s3/test.py
+++ b/tests/integration/test_merge_tree_s3/test.py
@@ -857,9 +857,9 @@ def test_merge_canceled_by_s3_errors(cluster, broken_s3, node_name, storage_poli
error = node.query_and_get_error(
"OPTIMIZE TABLE test_merge_canceled_by_s3_errors FINAL",
)
- assert "ExpectedError Message: mock s3 injected error" in error, error
+ assert "ExpectedError Message: mock s3 injected unretryable error" in error, error
- node.wait_for_log_line("ExpectedError Message: mock s3 injected error")
+ node.wait_for_log_line("ExpectedError Message: mock s3 injected unretryable error")
table_uuid = node.query(
"SELECT uuid FROM system.tables WHERE database = 'default' AND name = 'test_merge_canceled_by_s3_errors' LIMIT 1"
@@ -867,7 +867,7 @@ def test_merge_canceled_by_s3_errors(cluster, broken_s3, node_name, storage_poli
node.query("SYSTEM FLUSH LOGS")
error_count_in_blob_log = node.query(
- f"SELECT count() FROM system.blob_storage_log WHERE query_id like '{table_uuid}::%' AND error like '%mock s3 injected error%'"
+ f"SELECT count() FROM system.blob_storage_log WHERE query_id like '{table_uuid}::%' AND error like '%mock s3 injected unretryable error%'"
).strip()
assert int(error_count_in_blob_log) > 0, node.query(
f"SELECT * FROM system.blob_storage_log WHERE query_id like '{table_uuid}::%' FORMAT PrettyCompactMonoBlock"
@@ -911,7 +911,7 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, broken_s3, node_name):
node.query("OPTIMIZE TABLE merge_canceled_by_s3_errors_when_move FINAL")
- node.wait_for_log_line("ExpectedError Message: mock s3 injected error")
+ node.wait_for_log_line("ExpectedError Message: mock s3 injected unretryable error")
count = node.query("SELECT count() FROM merge_canceled_by_s3_errors_when_move")
assert int(count) == 2000, count