From a73d60bae5b49bf6b09e4acc05f59cecd528a007 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 22 May 2024 18:35:28 +0200 Subject: [PATCH 1/4] tests for qps_limit_exceeded --- contrib/aws | 2 +- .../integration/helpers/s3_mocks/broken_s3.py | 40 +++- .../test_checking_s3_blobs_paranoid/test.py | 206 +++++++++--------- 3 files changed, 143 insertions(+), 105 deletions(-) diff --git a/contrib/aws b/contrib/aws index 2e12d7c6daf..b7ae6e5bf48 160000 --- a/contrib/aws +++ b/contrib/aws @@ -1 +1 @@ -Subproject commit 2e12d7c6dafa81311ee3d73ac6a178550ffa75be +Subproject commit b7ae6e5bf48fb4981f24476bdd187cd35df1e2c6 diff --git a/tests/integration/helpers/s3_mocks/broken_s3.py b/tests/integration/helpers/s3_mocks/broken_s3.py index 206f960293f..238b8aac112 100644 --- a/tests/integration/helpers/s3_mocks/broken_s3.py +++ b/tests/integration/helpers/s3_mocks/broken_s3.py @@ -165,11 +165,35 @@ class _ServerRuntime: '' "" "ExpectedError" - "mock s3 injected error" + "mock s3 injected unretryable error" "txfbd566d03042474888193-00608d7537" "" ) - request_handler.write_error(data) + request_handler.write_error(500, data) + + class SlowDownAction: + def inject_error(self, request_handler): + data = ( + '' + "" + "SlowDown" + "Slow Down." + "txfbd566d03042474888193-00608d7537" + "" + ) + request_handler.write_error(429, data) + + class QpsLimitExceededAction: + def inject_error(self, request_handler): + data = ( + '' + "" + "QpsLimitExceeded" + "Please reduce your request rate." + "txfbd566d03042474888193-00608d7537" + "" + ) + request_handler.write_error(429, data) class RedirectAction: def __init__(self, host="localhost", port=1): @@ -239,6 +263,10 @@ class _ServerRuntime: self.error_handler = _ServerRuntime.BrokenPipeAction() elif self.action == "redirect_to": self.error_handler = _ServerRuntime.RedirectAction(*self.action_args) + elif self.action == "slow_down": + self.error_handler = _ServerRuntime.SlowDownAction(*self.action_args) + elif self.action == "qps_limit_exceeded": + self.error_handler = _ServerRuntime.QpsLimitExceededAction(*self.action_args) else: self.error_handler = _ServerRuntime.Expected500ErrorAction() @@ -344,12 +372,12 @@ class RequestHandler(http.server.BaseHTTPRequestHandler): self.end_headers() self.wfile.write(b"Redirected") - def write_error(self, data, content_length=None): + def write_error(self, http_code, data, content_length=None): if content_length is None: content_length = len(data) self.log_message("write_error %s", data) self.read_all_input() - self.send_response(500) + self.send_response(http_code) self.send_header("Content-Type", "text/xml") self.send_header("Content-Length", str(content_length)) self.end_headers() @@ -418,7 +446,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler): path = [x for x in parts.path.split("/") if x] assert path[0] == "mock_settings", path if len(path) < 2: - return self.write_error("_mock_settings: wrong command") + return self.write_error(400, "_mock_settings: wrong command") if path[1] == "at_part_upload": params = urllib.parse.parse_qs(parts.query, keep_blank_values=False) @@ -477,7 +505,7 @@ class RequestHandler(http.server.BaseHTTPRequestHandler): self.log_message("reset") return self._ok() - return self.write_error("_mock_settings: wrong command") + return self.write_error(400, "_mock_settings: wrong command") def do_GET(self): if self.path == "/": diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 22d6d263d23..97fc5de65e7 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -91,7 +91,7 @@ def get_multipart_counters(node, query_id, log_type="ExceptionWhileProcessing"): SELECT ProfileEvents['S3CreateMultipartUpload'], ProfileEvents['S3UploadPart'], - ProfileEvents['S3WriteRequestsErrors'], + ProfileEvents['S3WriteRequestsErrors'] + ProfileEvents['S3WriteRequestsThrottling'], FROM system.query_log WHERE query_id='{query_id}' AND type='{log_type}' @@ -148,7 +148,7 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression ) assert "Code: 499" in error, error - assert "mock s3 injected error" in error, error + assert "mock s3 injected unretryable error" in error, error create_multipart, upload_parts, s3_errors = get_multipart_counters( node, insert_query_id @@ -190,7 +190,7 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload( ) assert "Code: 499" in error, error - assert "mock s3 injected error" in error, error + assert "mock s3 injected unretryable error" in error, error create_multipart, upload_parts, s3_errors = get_multipart_counters( node, insert_query_id @@ -200,18 +200,28 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload( assert s3_errors >= 2 -def test_when_s3_connection_refused_is_retried(cluster, broken_s3): +@pytest.mark.parametrize( + "action_and_message", [ + ("slow_down", "DB::Exception: Slow Down."), + ("qps_limit_exceeded", "DB::Exception: Please reduce your request rate."), + ("connection_refused", "Poco::Exception. Code: 1000, e.code() = 111, Connection refused"), + ], + ids=lambda x: x[0] +) +def test_when_error_is_retried(cluster, broken_s3, action_and_message): node = cluster.instances["node"] - broken_s3.setup_fake_multpartuploads() - broken_s3.setup_at_part_upload(count=3, after=2, action="connection_refused") + action, message = action_and_message - insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED" + broken_s3.setup_fake_multpartuploads() + broken_s3.setup_at_part_upload(count=3, after=2, action=action) + + insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED" node.query( f""" INSERT INTO TABLE FUNCTION s3( - 'http://resolver:8083/root/data/test_when_s3_connection_refused_at_write_retried', + 'http://resolver:8083/root/data/test_when_{action}_retried', 'minio', 'minio123', 'CSV', auto, 'none' ) @@ -234,13 +244,13 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3): assert upload_parts == 39 assert s3_errors == 3 - broken_s3.setup_at_part_upload(count=1000, after=2, action="connection_refused") - insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED_1" + broken_s3.setup_at_part_upload(count=1000, after=2, action=action) + insert_query_id = f"INSERT_INTO_TABLE_{action}_RETRIED_1" error = node.query_and_get_error( f""" INSERT INTO TABLE FUNCTION s3( - 'http://resolver:8083/root/data/test_when_s3_connection_refused_at_write_retried', + 'http://resolver:8083/root/data/test_when_{action}_retried', 'minio', 'minio123', 'CSV', auto, 'none' ) @@ -258,7 +268,79 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3): assert "Code: 499" in error, error assert ( - "Poco::Exception. Code: 1000, e.code() = 111, Connection refused" in error + message in error + ), error + + +def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3): + node = cluster.instances["node"] + + broken_s3.setup_fake_multpartuploads() + broken_s3.setup_at_part_upload( + count=3, + after=2, + action="broken_pipe", + ) + + insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD" + node.query( + f""" + INSERT INTO + TABLE FUNCTION s3( + 'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried', + 'minio', 'minio123', + 'CSV', auto, 'none' + ) + SELECT + * + FROM system.numbers + LIMIT 1000000 + SETTINGS + s3_max_single_part_upload_size=100, + s3_min_upload_part_size=1000000, + s3_check_objects_after_upload=0 + """, + query_id=insert_query_id, + ) + + create_multipart, upload_parts, s3_errors = get_multipart_counters( + node, insert_query_id, log_type="QueryFinish" + ) + + assert create_multipart == 1 + assert upload_parts == 7 + assert s3_errors == 3 + + broken_s3.setup_at_part_upload( + count=1000, + after=2, + action="broken_pipe", + ) + insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1" + error = node.query_and_get_error( + f""" + INSERT INTO + TABLE FUNCTION s3( + 'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried', + 'minio', 'minio123', + 'CSV', auto, 'none' + ) + SELECT + * + FROM system.numbers + LIMIT 1000000 + SETTINGS + s3_max_single_part_upload_size=100, + s3_min_upload_part_size=1000000, + s3_check_objects_after_upload=0 + """, + query_id=insert_query_id, + ) + + assert "Code: 1000" in error, error + assert ( + "DB::Exception: Poco::Exception. Code: 1000, e.code() = 32, I/O error: Broken pipe" + in error ), error @@ -401,20 +483,20 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried( ) error = node.query_and_get_error( f""" - INSERT INTO - TABLE FUNCTION s3( - 'http://resolver:8083/root/data/test_when_s3_connection_reset_by_peer_at_create_mpu_retried', - 'minio', 'minio123', - 'CSV', auto, 'none' - ) - SELECT - * - FROM system.numbers - LIMIT 1000 - SETTINGS - s3_max_single_part_upload_size=100, - s3_min_upload_part_size=100, - s3_check_objects_after_upload=0 + INSERT INTO + TABLE FUNCTION s3( + 'http://resolver:8083/root/data/test_when_s3_connection_reset_by_peer_at_create_mpu_retried', + 'minio', 'minio123', + 'CSV', auto, 'none' + ) + SELECT + * + FROM system.numbers + LIMIT 1000 + SETTINGS + s3_max_single_part_upload_size=100, + s3_min_upload_part_size=100, + s3_check_objects_after_upload=0 """, query_id=insert_query_id, ) @@ -427,78 +509,6 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried( ), error -def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3): - node = cluster.instances["node"] - - broken_s3.setup_fake_multpartuploads() - broken_s3.setup_at_part_upload( - count=3, - after=2, - action="broken_pipe", - ) - - insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD" - node.query( - f""" - INSERT INTO - TABLE FUNCTION s3( - 'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried', - 'minio', 'minio123', - 'CSV', auto, 'none' - ) - SELECT - * - FROM system.numbers - LIMIT 1000000 - SETTINGS - s3_max_single_part_upload_size=100, - s3_min_upload_part_size=1000000, - s3_check_objects_after_upload=0 - """, - query_id=insert_query_id, - ) - - create_multipart, upload_parts, s3_errors = get_multipart_counters( - node, insert_query_id, log_type="QueryFinish" - ) - - assert create_multipart == 1 - assert upload_parts == 7 - assert s3_errors == 3 - - broken_s3.setup_at_part_upload( - count=1000, - after=2, - action="broken_pipe", - ) - insert_query_id = f"TEST_WHEN_S3_BROKEN_PIPE_AT_UPLOAD_1" - error = node.query_and_get_error( - f""" - INSERT INTO - TABLE FUNCTION s3( - 'http://resolver:8083/root/data/test_when_s3_broken_pipe_at_upload_is_retried', - 'minio', 'minio123', - 'CSV', auto, 'none' - ) - SELECT - * - FROM system.numbers - LIMIT 1000000 - SETTINGS - s3_max_single_part_upload_size=100, - s3_min_upload_part_size=1000000, - s3_check_objects_after_upload=0 - """, - query_id=insert_query_id, - ) - - assert "Code: 1000" in error, error - assert ( - "DB::Exception: Poco::Exception. Code: 1000, e.code() = 32, I/O error: Broken pipe" - in error - ), error - - def test_query_is_canceled_with_inf_retries(cluster, broken_s3): node = cluster.instances["node_with_inf_s3_retries"] From 52fe1fab97a5f39c99c33deb1054bf319fbbf230 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Wed, 22 May 2024 16:46:02 +0000 Subject: [PATCH 2/4] Automatic style fix --- tests/integration/helpers/s3_mocks/broken_s3.py | 4 +++- .../test_checking_s3_blobs_paranoid/test.py | 14 ++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/integration/helpers/s3_mocks/broken_s3.py b/tests/integration/helpers/s3_mocks/broken_s3.py index 238b8aac112..7d0127bc1c4 100644 --- a/tests/integration/helpers/s3_mocks/broken_s3.py +++ b/tests/integration/helpers/s3_mocks/broken_s3.py @@ -266,7 +266,9 @@ class _ServerRuntime: elif self.action == "slow_down": self.error_handler = _ServerRuntime.SlowDownAction(*self.action_args) elif self.action == "qps_limit_exceeded": - self.error_handler = _ServerRuntime.QpsLimitExceededAction(*self.action_args) + self.error_handler = _ServerRuntime.QpsLimitExceededAction( + *self.action_args + ) else: self.error_handler = _ServerRuntime.Expected500ErrorAction() diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 97fc5de65e7..a7fe02b16de 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -201,12 +201,16 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload( @pytest.mark.parametrize( - "action_and_message", [ + "action_and_message", + [ ("slow_down", "DB::Exception: Slow Down."), ("qps_limit_exceeded", "DB::Exception: Please reduce your request rate."), - ("connection_refused", "Poco::Exception. Code: 1000, e.code() = 111, Connection refused"), + ( + "connection_refused", + "Poco::Exception. Code: 1000, e.code() = 111, Connection refused", + ), ], - ids=lambda x: x[0] + ids=lambda x: x[0], ) def test_when_error_is_retried(cluster, broken_s3, action_and_message): node = cluster.instances["node"] @@ -267,9 +271,7 @@ def test_when_error_is_retried(cluster, broken_s3, action_and_message): ) assert "Code: 499" in error, error - assert ( - message in error - ), error + assert message in error, error def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3): From 6be79a35b6a55e88103056058ce9833ac62be77e Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 22 May 2024 20:30:19 +0200 Subject: [PATCH 3/4] update contrib/aws to the last head --- contrib/aws | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/aws b/contrib/aws index b7ae6e5bf48..eb96e740453 160000 --- a/contrib/aws +++ b/contrib/aws @@ -1 +1 @@ -Subproject commit b7ae6e5bf48fb4981f24476bdd187cd35df1e2c6 +Subproject commit eb96e740453ae27afa1f367ba19f99bdcb38484d From c150c20512afef6ae816606f197b1ab0a2160712 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Thu, 23 May 2024 13:53:36 +0200 Subject: [PATCH 4/4] adjust tests in test_merge_tree_s3 --- tests/integration/test_merge_tree_s3/test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 9216b08f942..0bf81e81383 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -857,9 +857,9 @@ def test_merge_canceled_by_s3_errors(cluster, broken_s3, node_name, storage_poli error = node.query_and_get_error( "OPTIMIZE TABLE test_merge_canceled_by_s3_errors FINAL", ) - assert "ExpectedError Message: mock s3 injected error" in error, error + assert "ExpectedError Message: mock s3 injected unretryable error" in error, error - node.wait_for_log_line("ExpectedError Message: mock s3 injected error") + node.wait_for_log_line("ExpectedError Message: mock s3 injected unretryable error") table_uuid = node.query( "SELECT uuid FROM system.tables WHERE database = 'default' AND name = 'test_merge_canceled_by_s3_errors' LIMIT 1" @@ -867,7 +867,7 @@ def test_merge_canceled_by_s3_errors(cluster, broken_s3, node_name, storage_poli node.query("SYSTEM FLUSH LOGS") error_count_in_blob_log = node.query( - f"SELECT count() FROM system.blob_storage_log WHERE query_id like '{table_uuid}::%' AND error like '%mock s3 injected error%'" + f"SELECT count() FROM system.blob_storage_log WHERE query_id like '{table_uuid}::%' AND error like '%mock s3 injected unretryable error%'" ).strip() assert int(error_count_in_blob_log) > 0, node.query( f"SELECT * FROM system.blob_storage_log WHERE query_id like '{table_uuid}::%' FORMAT PrettyCompactMonoBlock" @@ -911,7 +911,7 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, broken_s3, node_name): node.query("OPTIMIZE TABLE merge_canceled_by_s3_errors_when_move FINAL") - node.wait_for_log_line("ExpectedError Message: mock s3 injected error") + node.wait_for_log_line("ExpectedError Message: mock s3 injected unretryable error") count = node.query("SELECT count() FROM merge_canceled_by_s3_errors_when_move") assert int(count) == 2000, count