From 770a762317b88060f71bafdbff4f31a60e5ad8d1 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Fri, 3 Nov 2023 19:01:33 +0100 Subject: [PATCH 01/19] aggressive timeout --- src/Backups/BackupIO_S3.cpp | 4 +++- src/Core/Settings.h | 1 + src/IO/ConnectionTimeouts.cpp | 20 ++++++++++++++++ src/IO/ConnectionTimeouts.h | 2 ++ src/IO/S3/Client.cpp | 1 + src/IO/S3/PocoHTTPClient.cpp | 43 +++++++++++++++++++++++++++++++---- src/IO/S3/PocoHTTPClient.h | 5 ++++ 7 files changed, 71 insertions(+), 5 deletions(-) diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index 8bb2f895e38..0b700665988 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -55,7 +55,9 @@ namespace static_cast(context->getGlobalContext()->getSettingsRef().s3_max_redirects), static_cast(context->getGlobalContext()->getSettingsRef().s3_retry_attempts), context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging, - /* for_disk_s3 = */ false, request_settings.get_request_throttler, request_settings.put_request_throttler, + /* for_disk_s3 = */ false, + request_settings.get_request_throttler, + request_settings.put_request_throttler, s3_uri.uri.getScheme()); client_configuration.endpointOverride = s3_uri.endpoint; diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 3b90a3e068b..b1459b6f328 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -94,6 +94,7 @@ class IColumn; M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \ M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \ M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ + M(Bool, s3_aggressive_timeouts, true, "When aggressive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \ M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \ diff --git a/src/IO/ConnectionTimeouts.cpp b/src/IO/ConnectionTimeouts.cpp index 01fbaa4f817..a9eebb1a755 100644 --- a/src/IO/ConnectionTimeouts.cpp +++ b/src/IO/ConnectionTimeouts.cpp @@ -133,4 +133,24 @@ ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Settings & settings settings.http_receive_timeout); } +ConnectionTimeouts ConnectionTimeouts::aggressiveTimeouts(UInt32 attempt) const +{ + auto aggressive = *this; + + if (attempt == 2) + { + auto one_second = Poco::Timespan(1, 0); + aggressive.send_timeout = saturate(one_second, send_timeout); + aggressive.receive_timeout = saturate(one_second, receive_timeout); + } + else if (attempt == 1) + { + auto two_hundred_ms = Poco::Timespan(0, 200 * 1000); + aggressive.send_timeout = saturate(two_hundred_ms, send_timeout); + aggressive.receive_timeout = saturate(two_hundred_ms, receive_timeout); + } + + return aggressive; +} + } diff --git a/src/IO/ConnectionTimeouts.h b/src/IO/ConnectionTimeouts.h index 684af42827f..17ee1907d89 100644 --- a/src/IO/ConnectionTimeouts.h +++ b/src/IO/ConnectionTimeouts.h @@ -67,6 +67,8 @@ struct ConnectionTimeouts /// Timeouts for the case when we will try many addresses in a loop. static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings); static ConnectionTimeouts getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout); + + ConnectionTimeouts aggressiveTimeouts(UInt32 attempt) const; }; } diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index ceb7d275299..4250342c49f 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -905,6 +905,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT s3_retry_attempts, enable_s3_requests_logging, for_disk_s3, + context->getGlobalContext()->getSettingsRef().s3_aggressive_timeouts, get_request_throttler, put_request_throttler, error_report); diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index d0f248f48a6..08ba04ee875 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -99,6 +100,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration( unsigned int s3_retry_attempts_, bool enable_s3_requests_logging_, bool for_disk_s3_, + bool s3_aggressive_timeouts_, const ThrottlerPtr & get_request_throttler_, const ThrottlerPtr & put_request_throttler_, std::function error_report_) @@ -111,6 +113,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration( , for_disk_s3(for_disk_s3_) , get_request_throttler(get_request_throttler_) , put_request_throttler(put_request_throttler_) + , s3_aggressive_timeouts(s3_aggressive_timeouts_) , error_report(error_report_) { } @@ -157,6 +160,7 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config Poco::Timespan(client_configuration.http_keep_alive_timeout_ms * 1000))) /// flag indicating whether keep-alive is enabled is set to each session upon creation , remote_host_filter(client_configuration.remote_host_filter) , s3_max_redirects(client_configuration.s3_max_redirects) + , s3_aggressive_timeouts(client_configuration.s3_aggressive_timeouts) , enable_s3_requests_logging(client_configuration.enable_s3_requests_logging) , for_disk_s3(client_configuration.for_disk_s3) , get_request_throttler(client_configuration.get_request_throttler) @@ -268,6 +272,37 @@ void PocoHTTPClient::addMetric(const Aws::Http::HttpRequest & request, S3MetricT ProfileEvents::increment(disk_s3_events_map[static_cast(type)][static_cast(kind)], amount); } +UInt32 extractAttempt(const Aws::String & request_info) +{ + static auto key = Aws::String("attempt="); + + auto key_begin = request_info.find(key, 0); + if (key_begin == Aws::String::npos) + return 1; + + auto val_begin = key_begin + key.size(); + auto val_end = request_info.find(';', val_begin); + if (val_end == Aws::String::npos) + val_end = request_info.size(); + + Aws::String value = request_info.substr(val_begin, val_end-val_begin); + + UInt32 attempt = 1; + ReadBufferFromString buf(value); + readIntText(attempt, buf); + return attempt; +} + +ConnectionTimeouts PocoHTTPClient::getTimeouts(Aws::Http::HttpRequest & request) const +{ + if (!s3_aggressive_timeouts) + return timeouts; + + const auto & request_info = request.GetHeaderValue(Aws::Http::SDK_REQUEST_HEADER); + auto attempt = extractAttempt(request_info); + return timeouts.aggressiveTimeouts(attempt); +} + void PocoHTTPClient::makeRequestInternal( Aws::Http::HttpRequest & request, std::shared_ptr & response, @@ -348,17 +383,17 @@ void PocoHTTPClient::makeRequestInternalImpl( /// This can lead to request signature difference on S3 side. if constexpr (pooled) session = makePooledHTTPSession( - target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit, proxy_configuration); + target_uri, getTimeouts(request), http_connection_pool_size, wait_on_pool_size_limit, proxy_configuration); else - session = makeHTTPSession(target_uri, timeouts, proxy_configuration); + session = makeHTTPSession(target_uri, getTimeouts(request), proxy_configuration); } else { if constexpr (pooled) session = makePooledHTTPSession( - target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit); + target_uri, getTimeouts(request), http_connection_pool_size, wait_on_pool_size_limit); else - session = makeHTTPSession(target_uri, timeouts); + session = makeHTTPSession(target_uri, getTimeouts(request)); } /// In case of error this address will be written to logs diff --git a/src/IO/S3/PocoHTTPClient.h b/src/IO/S3/PocoHTTPClient.h index 2a449458360..6eeff431569 100644 --- a/src/IO/S3/PocoHTTPClient.h +++ b/src/IO/S3/PocoHTTPClient.h @@ -55,6 +55,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration size_t http_connection_pool_size = 0; /// See PoolBase::BehaviourOnLimit bool wait_on_pool_size_limit = true; + bool s3_aggressive_timeouts = false; std::function error_report; @@ -69,6 +70,7 @@ private: unsigned int s3_retry_attempts, bool enable_s3_requests_logging_, bool for_disk_s3_, + bool s3_aggressive_timeouts_, const ThrottlerPtr & get_request_throttler_, const ThrottlerPtr & put_request_throttler_, std::function error_report_ @@ -169,6 +171,8 @@ private: Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const; + ConnectionTimeouts getTimeouts(Aws::Http::HttpRequest & request) const; + protected: static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request); void addMetric(const Aws::Http::HttpRequest & request, S3MetricType type, ProfileEvents::Count amount = 1) const; @@ -178,6 +182,7 @@ protected: ConnectionTimeouts timeouts; const RemoteHostFilter & remote_host_filter; unsigned int s3_max_redirects; + bool s3_aggressive_timeouts = false; bool enable_s3_requests_logging; bool for_disk_s3; From ab2594154e35a4fed769de14bfc2b720598dfaa7 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Mon, 6 Nov 2023 20:57:16 +0100 Subject: [PATCH 02/19] add tests --- src/Disks/ObjectStorages/S3/diskSettings.cpp | 6 +- .../configs/inf_s3_retries.xml | 1 + .../configs/s3_retries.xml | 1 + .../configs/storage_conf.xml | 15 +++- .../test_checking_s3_blobs_paranoid/test.py | 74 +++++++++++++++++-- 5 files changed, 87 insertions(+), 10 deletions(-) diff --git a/src/Disks/ObjectStorages/S3/diskSettings.cpp b/src/Disks/ObjectStorages/S3/diskSettings.cpp index de88c876922..43618c64776 100644 --- a/src/Disks/ObjectStorages/S3/diskSettings.cpp +++ b/src/Disks/ObjectStorages/S3/diskSettings.cpp @@ -63,10 +63,12 @@ std::unique_ptr getClient( client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000); client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); client_configuration.endpointOverride = uri.endpoint; - client_configuration.http_keep_alive_timeout_ms - = config.getUInt(config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000); + client_configuration.http_keep_alive_timeout_ms = config.getUInt( + config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000); client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000); client_configuration.wait_on_pool_size_limit = false; + client_configuration.s3_aggressive_timeouts = config.getUInt( + config_prefix + ".aggressive_timeouts", client_configuration.s3_aggressive_timeouts); /* * Override proxy configuration for backwards compatibility with old configuration format. diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/inf_s3_retries.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/inf_s3_retries.xml index 206eb4f2bad..5f0860ac120 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/configs/inf_s3_retries.xml +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/inf_s3_retries.xml @@ -4,6 +4,7 @@ 1000000 + 1 diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/s3_retries.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/s3_retries.xml index 556bf60d385..f215a89f613 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/configs/s3_retries.xml +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/s3_retries.xml @@ -4,6 +4,7 @@ 5 + 0 diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/storage_conf.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/storage_conf.xml index b77e72d808b..264c411b59b 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/configs/storage_conf.xml +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/storage_conf.xml @@ -7,6 +7,12 @@ + + s3 + http://minio1:9001/root/data/ + minio + minio123 + s3 http://resolver:8083/root/data/ @@ -23,9 +29,16 @@ + + +
+ s3 +
+
+
- broken_s3 + s3 diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index d6bcb3fb8f4..7f8664f1648 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -64,6 +64,8 @@ def test_upload_after_check_works(cluster, broken_s3): data String ) ENGINE=MergeTree() ORDER BY id + SETTINGS + storage_policy='broken_s3' """ ) @@ -87,7 +89,8 @@ def get_counters(node, query_id, log_type="ExceptionWhileProcessing"): SELECT ProfileEvents['S3CreateMultipartUpload'], ProfileEvents['S3UploadPart'], - ProfileEvents['S3WriteRequestsErrors'] + ProfileEvents['S3WriteRequestsErrors'], + ProfileEvents['S3PutObject'], FROM system.query_log WHERE query_id='{query_id}' AND type='{log_type}' @@ -129,7 +132,7 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression assert "Code: 499" in error, error assert "mock s3 injected error" in error, error - count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters( + count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( node, insert_query_id ) assert count_create_multi_part_uploads == 1 @@ -172,7 +175,7 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload( assert "Code: 499" in error, error assert "mock s3 injected error" in error, error - count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters( + count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( node, insert_query_id ) assert count_create_multi_part_uploads == 1 @@ -207,7 +210,7 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3): query_id=insert_query_id, ) - count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters( + count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( node, insert_query_id, log_type="QueryFinish" ) assert count_create_multi_part_uploads == 1 @@ -279,7 +282,7 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried( query_id=insert_query_id, ) - count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters( + count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( node, insert_query_id, log_type="QueryFinish" ) @@ -361,7 +364,7 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried( query_id=insert_query_id, ) - count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters( + count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( node, insert_query_id, log_type="QueryFinish" ) @@ -438,7 +441,7 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3): query_id=insert_query_id, ) - count_create_multi_part_uploads, count_upload_parts, count_s3_errors = get_counters( + count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( node, insert_query_id, log_type="QueryFinish" ) @@ -533,3 +536,60 @@ def test_query_is_canceled_with_inf_retries(cluster, broken_s3): retry_count=120, sleep_time=1, ) + + +@pytest.mark.parametrize("node_name", ["node", "node_with_inf_s3_retries"]) +def test_aggressive_timeouts(cluster, broken_s3, node_name): + node = cluster.instances[node_name] + + broken_s3.setup_fake_puts(part_length=1) + broken_s3.setup_slow_answers( + timeout=5, + count=1000000, + ) + + insert_query_id = f"TEST_AGGRESSIVE_TIMEOUTS_{node_name}" + node.query( + f""" + INSERT INTO + TABLE FUNCTION s3( + 'http://resolver:8083/root/data/aggressive_timeouts', + 'minio', 'minio123', + 'CSV', auto, 'none' + ) + SELECT + * + FROM system.numbers + LIMIT 1 + SETTINGS + s3_request_timeout_ms=30000, + s3_check_objects_after_upload=0 + """, + query_id=insert_query_id, + ) + + broken_s3.reset() + + _, _, count_s3_errors, count_s3_puts = get_counters( + node, insert_query_id, log_type="QueryFinish" + ) + + assert count_s3_puts == 1 + + s3_aggressive_timeouts_state = node.query( + f""" + SELECT + value + FROM system.settings + WHERE + name='s3_aggressive_timeouts' + """ + ).strip() + + if node_name == "node_with_inf_s3_retries": + # first 2 attempts failed + assert s3_aggressive_timeouts_state == "1" + assert count_s3_errors == 2 + else: + assert s3_aggressive_timeouts_state == "0" + assert count_s3_errors == 0 From e0edd165da4df6d700fcde818cc91492f295323f Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 7 Nov 2023 11:21:46 +0100 Subject: [PATCH 03/19] add doc --- docs/en/operations/settings/settings.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index d0acad7b557..306529c4b96 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -4820,3 +4820,10 @@ When set to `true` the metadata files are written with `VERSION_FULL_OBJECT_KEY` When set to `false` the metadata files are written with the previous format version, `VERSION_INLINE_DATA`. With that format only suffixes of object storage key names are are written to the metadata files. The prefix for all of object storage key names is set in configurations files at `storage_configuration.disks` section. Default value: `false`. + +## s3_aggressive_timeouts {#s3_aggressive_timeouts} + +When set to `true` than for all s3 requests first two attempts are made with low send and receive timeouts. +When set to `false` than all attempts are made with identical timeouts. + +Default value: `true`. From 338c51745f1709220a01a3684b5e305ff64ff788 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 7 Nov 2023 12:08:38 +0100 Subject: [PATCH 04/19] fix style --- .../test_checking_s3_blobs_paranoid/test.py | 75 ++++++++++++------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 7f8664f1648..441a5a541e8 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -80,7 +80,7 @@ def test_upload_after_check_works(cluster, broken_s3): assert "suddenly disappeared" in error, error -def get_counters(node, query_id, log_type="ExceptionWhileProcessing"): +def get_multipart_counters(node, query_id, log_type="ExceptionWhileProcessing"): node.query("SYSTEM FLUSH LOGS") return [ int(x) @@ -90,7 +90,24 @@ def get_counters(node, query_id, log_type="ExceptionWhileProcessing"): ProfileEvents['S3CreateMultipartUpload'], ProfileEvents['S3UploadPart'], ProfileEvents['S3WriteRequestsErrors'], + FROM system.query_log + WHERE query_id='{query_id}' + AND type='{log_type}' + """ + ).split() + if x + ] + + +def get_put_counters(node, query_id, log_type="ExceptionWhileProcessing"): + node.query("SYSTEM FLUSH LOGS") + return [ + int(x) + for x in node.query( + f""" + SELECT ProfileEvents['S3PutObject'], + ProfileEvents['S3WriteRequestsErrors'], FROM system.query_log WHERE query_id='{query_id}' AND type='{log_type}' @@ -132,12 +149,12 @@ def test_upload_s3_fail_create_multi_part_upload(cluster, broken_s3, compression assert "Code: 499" in error, error assert "mock s3 injected error" in error, error - count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( + create_multipart, upload_parts, s3_errors = get_multipart_counters( node, insert_query_id ) - assert count_create_multi_part_uploads == 1 - assert count_upload_parts == 0 - assert count_s3_errors == 1 + assert create_multipart == 1 + assert upload_parts == 0 + assert s3_errors == 1 # Add "lz4" compression method in the list after https://github.com/ClickHouse/ClickHouse/issues/50975 is fixed @@ -175,12 +192,12 @@ def test_upload_s3_fail_upload_part_when_multi_part_upload( assert "Code: 499" in error, error assert "mock s3 injected error" in error, error - count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( + create_multipart, upload_parts, s3_errors = get_multipart_counters( node, insert_query_id ) - assert count_create_multi_part_uploads == 1 - assert count_upload_parts >= 2 - assert count_s3_errors >= 2 + assert create_multipart == 1 + assert upload_parts >= 2 + assert s3_errors >= 2 def test_when_s3_connection_refused_is_retried(cluster, broken_s3): @@ -210,12 +227,12 @@ def test_when_s3_connection_refused_is_retried(cluster, broken_s3): query_id=insert_query_id, ) - count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( + create_multipart, upload_parts, s3_errors = get_multipart_counters( node, insert_query_id, log_type="QueryFinish" ) - assert count_create_multi_part_uploads == 1 - assert count_upload_parts == 39 - assert count_s3_errors == 3 + assert create_multipart == 1 + assert upload_parts == 39 + assert s3_errors == 3 broken_s3.setup_at_part_upload(count=1000, after=2, action="connection_refused") insert_query_id = f"INSERT_INTO_TABLE_FUNCTION_CONNECTION_REFUSED_RETRIED_1" @@ -282,13 +299,13 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried( query_id=insert_query_id, ) - count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( + create_multipart, upload_parts, s3_errors = get_multipart_counters( node, insert_query_id, log_type="QueryFinish" ) - assert count_create_multi_part_uploads == 1 - assert count_upload_parts == 39 - assert count_s3_errors == 3 + assert create_multipart == 1 + assert upload_parts == 39 + assert s3_errors == 3 broken_s3.setup_at_part_upload( count=1000, @@ -364,13 +381,13 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried( query_id=insert_query_id, ) - count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( + create_multipart, upload_parts, s3_errors = get_multipart_counters( node, insert_query_id, log_type="QueryFinish" ) - assert count_create_multi_part_uploads == 1 - assert count_upload_parts == 39 - assert count_s3_errors == 3 + assert create_multipart == 1 + assert upload_parts == 39 + assert s3_errors == 3 broken_s3.setup_at_create_multi_part_upload( count=1000, @@ -441,13 +458,13 @@ def test_when_s3_broken_pipe_at_upload_is_retried(cluster, broken_s3): query_id=insert_query_id, ) - count_create_multi_part_uploads, count_upload_parts, count_s3_errors, _ = get_counters( + create_multipart, upload_parts, s3_errors = get_multipart_counters( node, insert_query_id, log_type="QueryFinish" ) - assert count_create_multi_part_uploads == 1 - assert count_upload_parts == 7 - assert count_s3_errors == 3 + assert create_multipart == 1 + assert upload_parts == 7 + assert s3_errors == 3 broken_s3.setup_at_part_upload( count=1000, @@ -570,11 +587,11 @@ def test_aggressive_timeouts(cluster, broken_s3, node_name): broken_s3.reset() - _, _, count_s3_errors, count_s3_puts = get_counters( + put_objects, s3_errors = get_put_counters( node, insert_query_id, log_type="QueryFinish" ) - assert count_s3_puts == 1 + assert put_objects == 1 s3_aggressive_timeouts_state = node.query( f""" @@ -589,7 +606,7 @@ def test_aggressive_timeouts(cluster, broken_s3, node_name): if node_name == "node_with_inf_s3_retries": # first 2 attempts failed assert s3_aggressive_timeouts_state == "1" - assert count_s3_errors == 2 + assert s3_errors == 2 else: assert s3_aggressive_timeouts_state == "0" - assert count_s3_errors == 0 + assert s3_errors == 0 From 1a7be21a66eb498a6d52718d9cd7f37ba9729213 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 7 Nov 2023 18:05:57 +0100 Subject: [PATCH 05/19] adjust tests --- .../configs/config.d/storage_conf.xml | 2 ++ tests/integration/test_storage_s3/configs/defaultS3.xml | 5 ----- tests/integration/test_storage_s3/configs/s3_retry.xml | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml index 235b9a7b7a1..f51b854de75 100644 --- a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml +++ b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml @@ -11,6 +11,7 @@ true 0 + 0 20000 @@ -33,6 +34,7 @@ true 1 + 0 1 20000 diff --git a/tests/integration/test_storage_s3/configs/defaultS3.xml b/tests/integration/test_storage_s3/configs/defaultS3.xml index 37454ef6781..7dac6d9fbb5 100644 --- a/tests/integration/test_storage_s3/configs/defaultS3.xml +++ b/tests/integration/test_storage_s3/configs/defaultS3.xml @@ -1,9 +1,4 @@ - - - 5 - - http://resolver:8080 diff --git a/tests/integration/test_storage_s3/configs/s3_retry.xml b/tests/integration/test_storage_s3/configs/s3_retry.xml index 727e23273cf..581fc44c8d4 100644 --- a/tests/integration/test_storage_s3/configs/s3_retry.xml +++ b/tests/integration/test_storage_s3/configs/s3_retry.xml @@ -1,7 +1,7 @@ - 5 + 10 From 45de9beab4231a806ea247adef0a1fc5180748ba Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 8 Nov 2023 13:19:28 +0100 Subject: [PATCH 06/19] set new timeout for session from connection pool --- base/poco/Net/src/HTTPSession.cpp | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index d2663baaf9f..d30f5590280 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -94,8 +94,22 @@ void HTTPSession::setTimeout(const Poco::Timespan& timeout) void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco::Timespan& sendTimeout, const Poco::Timespan& receiveTimeout) { _connectionTimeout = connectionTimeout; - _sendTimeout = sendTimeout; - _receiveTimeout = receiveTimeout; + + if (_sendTimeout != sendTimeout) + { + _sendTimeout = sendTimeout; + + if (connected()) + _socket.setSendTimeout(_sendTimeout); + } + + if (_receiveTimeout != receiveTimeout) + { + _receiveTimeout = receiveTimeout; + + if (connected()) + _socket.setReceiveTimeout(_receiveTimeout); + } } From be01a5cd3e07eeba990a8dcc3e69e62f3492d05e Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 8 Nov 2023 17:32:06 +0100 Subject: [PATCH 07/19] turn off agressive timeouts for heavy requests --- src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp | 6 +++++- src/Disks/ObjectStorages/S3/S3ObjectStorage.h | 1 + src/IO/S3/Client.cpp | 9 ++++++--- src/IO/S3/Client.h | 6 ++---- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index b36185249af..aa4bcd7fbad 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -537,7 +537,11 @@ std::unique_ptr S3ObjectStorage::cloneObjectStorage( } S3ObjectStorage::Clients::Clients(std::shared_ptr client_, const S3ObjectStorageSettings & settings) - : client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {} + : client(std::move(client_)) + , client_with_long_timeout(client->clone( + /*override_aggressive_timeouts*/ false, + settings.request_settings.long_request_timeout_ms)) +{} ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string &) const { diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index b1b3fb22366..37e491e21dc 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -184,6 +184,7 @@ private: std::string bucket; String object_key_prefix; + MultiVersion clients; MultiVersion s3_settings; S3Capabilities s3_capabilities; diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 4250342c49f..12a0cb8f93c 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -119,14 +119,17 @@ std::unique_ptr Client::create( } std::unique_ptr Client::clone( - std::optional> override_retry_strategy, + std::optional override_aggressive_timeouts, std::optional override_request_timeout_ms) const { PocoHTTPClientConfiguration new_configuration = client_configuration; - if (override_retry_strategy.has_value()) - new_configuration.retryStrategy = *override_retry_strategy; + if (override_request_timeout_ms.has_value()) new_configuration.requestTimeoutMs = *override_request_timeout_ms; + + if (override_aggressive_timeouts.has_value()) + new_configuration.s3_aggressive_timeouts = *override_aggressive_timeouts; + return std::unique_ptr(new Client(*this, new_configuration)); } diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index 48310bc21af..81ab3854d3d 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -119,13 +119,11 @@ public: bool use_virtual_addressing); /// Create a client with adjusted settings: - /// * override_retry_strategy can be used to disable retries to avoid nested retries when we have - /// a retry loop outside of S3 client. Specifically, for read and write buffers. Currently not - /// actually used. /// * override_request_timeout_ms is used to increase timeout for CompleteMultipartUploadRequest /// because it often sits idle for 10 seconds: https://github.com/ClickHouse/ClickHouse/pull/42321 + /// * s3_aggressive_timeouts is used to turn off s3_aggressive_timeouts feature for CompleteMultipartUploadRequest std::unique_ptr clone( - std::optional> override_retry_strategy = std::nullopt, + std::optional override_aggressive_timeouts = std::nullopt, std::optional override_request_timeout_ms = std::nullopt) const; Client & operator=(const Client &) = delete; From 27fb25d056c420bca141ce2ecd83868d15fd07ef Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Thu, 9 Nov 2023 13:10:52 +0100 Subject: [PATCH 08/19] alter the naming, fix client_with_long_timeout in s3 storage --- src/Core/Settings.h | 2 +- src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp | 2 +- src/Disks/ObjectStorages/S3/diskSettings.cpp | 4 ++-- src/IO/S3/Client.cpp | 8 ++++---- src/IO/S3/Client.h | 4 ++-- src/IO/S3/PocoHTTPClient.cpp | 8 ++++---- src/IO/S3/PocoHTTPClient.h | 6 +++--- src/Storages/StorageS3.cpp | 2 +- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index b1459b6f328..3f80c83ff5f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -94,7 +94,7 @@ class IColumn; M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \ M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \ M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ - M(Bool, s3_aggressive_timeouts, true, "When aggressive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \ + M(Bool, s3_use_adaptive_timeouts, true, "When aggressive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \ M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \ diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index aa4bcd7fbad..8a46bfd59d1 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -539,7 +539,7 @@ std::unique_ptr S3ObjectStorage::cloneObjectStorage( S3ObjectStorage::Clients::Clients(std::shared_ptr client_, const S3ObjectStorageSettings & settings) : client(std::move(client_)) , client_with_long_timeout(client->clone( - /*override_aggressive_timeouts*/ false, + /*override_use_adaptive_timeouts*/ false, settings.request_settings.long_request_timeout_ms)) {} diff --git a/src/Disks/ObjectStorages/S3/diskSettings.cpp b/src/Disks/ObjectStorages/S3/diskSettings.cpp index 43618c64776..573fa744ce6 100644 --- a/src/Disks/ObjectStorages/S3/diskSettings.cpp +++ b/src/Disks/ObjectStorages/S3/diskSettings.cpp @@ -67,8 +67,8 @@ std::unique_ptr getClient( config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000); client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000); client_configuration.wait_on_pool_size_limit = false; - client_configuration.s3_aggressive_timeouts = config.getUInt( - config_prefix + ".aggressive_timeouts", client_configuration.s3_aggressive_timeouts); + client_configuration.s3_use_adaptive_timeouts = config.getUInt( + config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts); /* * Override proxy configuration for backwards compatibility with old configuration format. diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 12a0cb8f93c..90806852c1e 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -119,7 +119,7 @@ std::unique_ptr Client::create( } std::unique_ptr Client::clone( - std::optional override_aggressive_timeouts, + std::optional override_use_adaptive_timeouts, std::optional override_request_timeout_ms) const { PocoHTTPClientConfiguration new_configuration = client_configuration; @@ -127,8 +127,8 @@ std::unique_ptr Client::clone( if (override_request_timeout_ms.has_value()) new_configuration.requestTimeoutMs = *override_request_timeout_ms; - if (override_aggressive_timeouts.has_value()) - new_configuration.s3_aggressive_timeouts = *override_aggressive_timeouts; + if (override_use_adaptive_timeouts.has_value()) + new_configuration.s3_use_adaptive_timeouts = *override_use_adaptive_timeouts; return std::unique_ptr(new Client(*this, new_configuration)); } @@ -908,7 +908,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT s3_retry_attempts, enable_s3_requests_logging, for_disk_s3, - context->getGlobalContext()->getSettingsRef().s3_aggressive_timeouts, + context->getGlobalContext()->getSettingsRef().s3_use_adaptive_timeouts, get_request_throttler, put_request_throttler, error_report); diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index 81ab3854d3d..be7235eb9f1 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -121,9 +121,9 @@ public: /// Create a client with adjusted settings: /// * override_request_timeout_ms is used to increase timeout for CompleteMultipartUploadRequest /// because it often sits idle for 10 seconds: https://github.com/ClickHouse/ClickHouse/pull/42321 - /// * s3_aggressive_timeouts is used to turn off s3_aggressive_timeouts feature for CompleteMultipartUploadRequest + /// * s3_use_adaptive_timeouts is used to turn off s3_use_adaptive_timeouts feature for CompleteMultipartUploadRequest std::unique_ptr clone( - std::optional override_aggressive_timeouts = std::nullopt, + std::optional override_use_adaptive_timeouts = std::nullopt, std::optional override_request_timeout_ms = std::nullopt) const; Client & operator=(const Client &) = delete; diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 08ba04ee875..f783a886877 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -100,7 +100,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration( unsigned int s3_retry_attempts_, bool enable_s3_requests_logging_, bool for_disk_s3_, - bool s3_aggressive_timeouts_, + bool s3_use_adaptive_timeouts_, const ThrottlerPtr & get_request_throttler_, const ThrottlerPtr & put_request_throttler_, std::function error_report_) @@ -113,7 +113,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration( , for_disk_s3(for_disk_s3_) , get_request_throttler(get_request_throttler_) , put_request_throttler(put_request_throttler_) - , s3_aggressive_timeouts(s3_aggressive_timeouts_) + , s3_use_adaptive_timeouts(s3_use_adaptive_timeouts_) , error_report(error_report_) { } @@ -160,7 +160,7 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config Poco::Timespan(client_configuration.http_keep_alive_timeout_ms * 1000))) /// flag indicating whether keep-alive is enabled is set to each session upon creation , remote_host_filter(client_configuration.remote_host_filter) , s3_max_redirects(client_configuration.s3_max_redirects) - , s3_aggressive_timeouts(client_configuration.s3_aggressive_timeouts) + , s3_use_adaptive_timeouts(client_configuration.s3_use_adaptive_timeouts) , enable_s3_requests_logging(client_configuration.enable_s3_requests_logging) , for_disk_s3(client_configuration.for_disk_s3) , get_request_throttler(client_configuration.get_request_throttler) @@ -295,7 +295,7 @@ UInt32 extractAttempt(const Aws::String & request_info) ConnectionTimeouts PocoHTTPClient::getTimeouts(Aws::Http::HttpRequest & request) const { - if (!s3_aggressive_timeouts) + if (!s3_use_adaptive_timeouts) return timeouts; const auto & request_info = request.GetHeaderValue(Aws::Http::SDK_REQUEST_HEADER); diff --git a/src/IO/S3/PocoHTTPClient.h b/src/IO/S3/PocoHTTPClient.h index 6eeff431569..9ba5f4ffe64 100644 --- a/src/IO/S3/PocoHTTPClient.h +++ b/src/IO/S3/PocoHTTPClient.h @@ -55,7 +55,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration size_t http_connection_pool_size = 0; /// See PoolBase::BehaviourOnLimit bool wait_on_pool_size_limit = true; - bool s3_aggressive_timeouts = false; + bool s3_use_adaptive_timeouts = false; std::function error_report; @@ -70,7 +70,7 @@ private: unsigned int s3_retry_attempts, bool enable_s3_requests_logging_, bool for_disk_s3_, - bool s3_aggressive_timeouts_, + bool s3_use_adaptive_timeouts_, const ThrottlerPtr & get_request_throttler_, const ThrottlerPtr & put_request_throttler_, std::function error_report_ @@ -182,7 +182,7 @@ protected: ConnectionTimeouts timeouts; const RemoteHostFilter & remote_host_filter; unsigned int s3_max_redirects; - bool s3_aggressive_timeouts = false; + bool s3_use_adaptive_timeouts = false; bool enable_s3_requests_logging; bool for_disk_s3; diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 63ed84680c9..231efb87e87 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1330,7 +1330,7 @@ void StorageS3::Configuration::connect(ContextPtr context) auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)), }); - client_with_long_timeout = client->clone(std::nullopt, request_settings.long_request_timeout_ms); + client_with_long_timeout = client->clone(/*override_use_adaptive_timeouts*/ false, request_settings.long_request_timeout_ms); } void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection) From bb0b6afe14319799028fbd8483b3bc4042e6f951 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Thu, 9 Nov 2023 13:12:38 +0100 Subject: [PATCH 09/19] reduce cuncurrent request number to the minio in test_storage_s3 --- tests/integration/test_storage_s3/configs/s3_retry.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_storage_s3/configs/s3_retry.xml b/tests/integration/test_storage_s3/configs/s3_retry.xml index 581fc44c8d4..b7a7bbc8a9b 100644 --- a/tests/integration/test_storage_s3/configs/s3_retry.xml +++ b/tests/integration/test_storage_s3/configs/s3_retry.xml @@ -2,6 +2,7 @@ 10 + 5 From 76d11687a76ede0a0fd080fd76ff04da501e7af6 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Thu, 9 Nov 2023 13:12:56 +0100 Subject: [PATCH 10/19] adjuct docs --- docs/en/operations/settings/settings.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 306529c4b96..34ed85c773a 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -4821,7 +4821,7 @@ When set to `false` the metadata files are written with the previous format vers Default value: `false`. -## s3_aggressive_timeouts {#s3_aggressive_timeouts} +## s3_use_adaptive_timeouts {#s3_use_adaptive_timeouts} When set to `true` than for all s3 requests first two attempts are made with low send and receive timeouts. When set to `false` than all attempts are made with identical timeouts. From 8d36fd6e54cc66ab826d80fb6c4ec867fad4b731 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Thu, 9 Nov 2023 23:54:31 +0100 Subject: [PATCH 11/19] get rid off of client_with_long_timeout_ptr --- src/Backups/BackupIO_S3.cpp | 5 +- src/Coordination/KeeperSnapshotManagerS3.cpp | 1 - src/Core/Settings.h | 2 +- .../ObjectStorages/S3/S3ObjectStorage.cpp | 62 ++++++------------- src/Disks/ObjectStorages/S3/S3ObjectStorage.h | 14 +---- src/Disks/ObjectStorages/S3/diskSettings.cpp | 2 +- src/IO/ConnectionTimeouts.cpp | 53 ++++++++++++---- src/IO/ConnectionTimeouts.h | 3 +- src/IO/S3/Client.cpp | 14 +---- src/IO/S3/Client.h | 8 +-- src/IO/S3/PocoHTTPClient.cpp | 2 +- src/IO/S3/copyS3File.cpp | 26 +++----- src/IO/S3/copyS3File.h | 7 --- src/IO/S3/tests/gtest_aws_s3_client.cpp | 1 - src/IO/WriteBufferFromS3.cpp | 4 +- src/IO/WriteBufferFromS3.h | 3 - src/IO/tests/gtest_writebuffer_s3.cpp | 1 - src/Storages/StorageS3.cpp | 3 - src/Storages/StorageS3.h | 1 - src/Storages/StorageS3Settings.h | 3 +- 20 files changed, 83 insertions(+), 132 deletions(-) diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index 0b700665988..4f83158d07d 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -169,7 +169,6 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s blob_path.size(), mode); copyS3File( - client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, @@ -231,7 +230,6 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src { LOG_TRACE(log, "Copying file {} from disk {} to S3", src_path, src_disk->getName()); copyS3File( - client, client, /* src_bucket */ blob_path[1], /* src_key= */ blob_path[0], @@ -253,7 +251,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length) { - copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {}, + copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {}, threadPoolCallbackRunner(getBackupsIOThreadPool().get(), "BackupWriterS3")); } @@ -283,7 +281,6 @@ std::unique_ptr BackupWriterS3::writeFile(const String & file_name) { return std::make_unique( client, - client, // already has long timeout s3_uri.bucket, fs::path(s3_uri.key) / file_name, DBMS_DEFAULT_BUFFER_SIZE, diff --git a/src/Coordination/KeeperSnapshotManagerS3.cpp b/src/Coordination/KeeperSnapshotManagerS3.cpp index 302e05c8418..bedde0d7b39 100644 --- a/src/Coordination/KeeperSnapshotManagerS3.cpp +++ b/src/Coordination/KeeperSnapshotManagerS3.cpp @@ -148,7 +148,6 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh const auto create_writer = [&](const auto & key) { return WriteBufferFromS3( - s3_client->client, s3_client->client, s3_client->uri.bucket, key, diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 3f80c83ff5f..34547aded9c 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -105,7 +105,7 @@ class IColumn; M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \ M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \ M(UInt64, s3_retry_attempts, 100, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \ - M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \ + M(UInt64, s3_request_timeout_ms, 30000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \ M(UInt64, s3_http_connection_pool_size, 1000, "How many reusable open connections to keep per S3 endpoint. Only applies to the S3 table engine and table function, not to S3 disks (for disks, use disk config instead). Global setting, can only be set in config, overriding it per session or per query has no effect.", 0) \ M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \ M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \ diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 8a46bfd59d1..75dd405f6aa 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -153,7 +153,7 @@ private: bool S3ObjectStorage::exists(const StoredObject & object) const { auto settings_ptr = s3_settings.get(); - return S3::objectExists(*clients.get()->client, bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); + return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); } std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT @@ -172,7 +172,7 @@ std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT (const std::string & path, size_t read_until_position) -> std::unique_ptr { return std::make_unique( - clients.get()->client, + client.get(), bucket, path, version_id, @@ -222,7 +222,7 @@ std::unique_ptr S3ObjectStorage::readObject( /// NOLINT { auto settings_ptr = s3_settings.get(); return std::make_unique( - clients.get()->client, + client.get(), bucket, object.remote_path, version_id, @@ -247,10 +247,8 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN if (write_settings.s3_allow_parallel_part_upload) scheduler = threadPoolCallbackRunner(getThreadPoolWriter(), "VFSWrite"); - auto clients_ = clients.get(); return std::make_unique( - clients_->client, - clients_->client_with_long_timeout, + client.get(), bucket, object.remote_path, buf_size, @@ -264,15 +262,12 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const { auto settings_ptr = s3_settings.get(); - auto client_ptr = clients.get()->client; - - return std::make_shared(bucket, path_prefix, client_ptr, settings_ptr->list_object_keys_size); + return std::make_shared(bucket, path_prefix, client.get(), settings_ptr->list_object_keys_size); } void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const { auto settings_ptr = s3_settings.get(); - auto client_ptr = clients.get()->client; S3::ListObjectsV2Request request; request.SetBucket(bucket); @@ -287,7 +282,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet { ProfileEvents::increment(ProfileEvents::S3ListObjects); ProfileEvents::increment(ProfileEvents::DiskS3ListObjects); - outcome = client_ptr->ListObjectsV2(request); + outcome = client.get()->ListObjectsV2(request); throwIfError(outcome); auto result = outcome.GetResult(); @@ -318,14 +313,12 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists) { - auto client_ptr = clients.get()->client; - ProfileEvents::increment(ProfileEvents::S3DeleteObjects); ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects); S3::DeleteObjectRequest request; request.SetBucket(bucket); request.SetKey(object.remote_path); - auto outcome = client_ptr->DeleteObject(request); + auto outcome = client.get()->DeleteObject(request); throwIfUnexpectedError(outcome, if_exists); @@ -344,7 +337,6 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e } else { - auto client_ptr = clients.get()->client; auto settings_ptr = s3_settings.get(); size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete; @@ -373,7 +365,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e S3::DeleteObjectsRequest request; request.SetBucket(bucket); request.SetDelete(delkeys); - auto outcome = client_ptr->DeleteObjects(request); + auto outcome = client.get()->DeleteObjects(request); throwIfUnexpectedError(outcome, if_exists); @@ -405,7 +397,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects) std::optional S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const { auto settings_ptr = s3_settings.get(); - auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false); + auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false); if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty()) return {}; @@ -421,7 +413,7 @@ std::optional S3ObjectStorage::tryGetObjectMetadata(const std::s ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const { auto settings_ptr = s3_settings.get(); - auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true); + auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true); ObjectMetadata result; result.size_bytes = object_info.size; @@ -442,12 +434,12 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT /// Shortcut for S3 if (auto * dest_s3 = dynamic_cast(&object_storage_to); dest_s3 != nullptr) { - auto clients_ = clients.get(); + auto client_ = client.get(); auto settings_ptr = s3_settings.get(); - auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); + auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); auto scheduler = threadPoolCallbackRunner(getThreadPoolWriter(), "S3ObjStor_copy"); - copyS3File(clients_->client, - clients_->client_with_long_timeout, + copyS3File( + client.get(), bucket, object_from.remote_path, 0, @@ -471,12 +463,11 @@ void S3ObjectStorage::copyObject( // NOLINT const WriteSettings &, std::optional object_to_attributes) { - auto clients_ = clients.get(); + auto client_ = client.get(); auto settings_ptr = s3_settings.get(); - auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); + auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); auto scheduler = threadPoolCallbackRunner(getThreadPoolWriter(), "S3ObjStor_copy"); - copyS3File(clients_->client, - clients_->client_with_long_timeout, + copyS3File(client_, bucket, object_from.remote_path, 0, @@ -497,31 +488,25 @@ void S3ObjectStorage::setNewSettings(std::unique_ptr && void S3ObjectStorage::shutdown() { - auto clients_ptr = clients.get(); /// This call stops any next retry attempts for ongoing S3 requests. /// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome. /// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors. /// This should significantly speed up shutdown process if S3 is unhealthy. - const_cast(*clients_ptr->client).DisableRequestProcessing(); - const_cast(*clients_ptr->client_with_long_timeout).DisableRequestProcessing(); + const_cast(*client.get()).DisableRequestProcessing(); } void S3ObjectStorage::startup() { - auto clients_ptr = clients.get(); - /// Need to be enabled if it was disabled during shutdown() call. - const_cast(*clients_ptr->client).EnableRequestProcessing(); - const_cast(*clients_ptr->client_with_long_timeout).EnableRequestProcessing(); + const_cast(*client.get()).EnableRequestProcessing(); } void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) { auto new_s3_settings = getSettings(config, config_prefix, context); auto new_client = getClient(config, config_prefix, context, *new_s3_settings); - auto new_clients = std::make_unique(std::move(new_client), *new_s3_settings); s3_settings.set(std::move(new_s3_settings)); - clients.set(std::move(new_clients)); + client.set(std::move(new_client)); } std::unique_ptr S3ObjectStorage::cloneObjectStorage( @@ -536,13 +521,6 @@ std::unique_ptr S3ObjectStorage::cloneObjectStorage( endpoint, object_key_prefix); } -S3ObjectStorage::Clients::Clients(std::shared_ptr client_, const S3ObjectStorageSettings & settings) - : client(std::move(client_)) - , client_with_long_timeout(client->clone( - /*override_use_adaptive_timeouts*/ false, - settings.request_settings.long_request_timeout_ms)) -{} - ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string &) const { /// Path to store the new S3 object. diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index 37e491e21dc..7d14482311f 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -39,16 +39,6 @@ struct S3ObjectStorageSettings class S3ObjectStorage : public IObjectStorage { -public: - struct Clients - { - std::shared_ptr client; - std::shared_ptr client_with_long_timeout; - - Clients() = default; - Clients(std::shared_ptr client, const S3ObjectStorageSettings & settings); - }; - private: friend class S3PlainObjectStorage; @@ -63,7 +53,7 @@ private: String object_key_prefix_) : bucket(std::move(bucket_)) , object_key_prefix(std::move(object_key_prefix_)) - , clients(std::make_unique(std::move(client_), *s3_settings_)) + , client(std::move(client_)) , s3_settings(std::move(s3_settings_)) , s3_capabilities(s3_capabilities_) , version_id(std::move(version_id_)) @@ -185,7 +175,7 @@ private: String object_key_prefix; - MultiVersion clients; + MultiVersion client; MultiVersion s3_settings; S3Capabilities s3_capabilities; diff --git a/src/Disks/ObjectStorages/S3/diskSettings.cpp b/src/Disks/ObjectStorages/S3/diskSettings.cpp index 573fa744ce6..b0384daab2d 100644 --- a/src/Disks/ObjectStorages/S3/diskSettings.cpp +++ b/src/Disks/ObjectStorages/S3/diskSettings.cpp @@ -60,7 +60,7 @@ std::unique_ptr getClient( uri.uri.getScheme()); client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 1000); - client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000); + client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 30000); client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); client_configuration.endpointOverride = uri.endpoint; client_configuration.http_keep_alive_timeout_ms = config.getUInt( diff --git a/src/IO/ConnectionTimeouts.cpp b/src/IO/ConnectionTimeouts.cpp index a9eebb1a755..90406dcf409 100644 --- a/src/IO/ConnectionTimeouts.cpp +++ b/src/IO/ConnectionTimeouts.cpp @@ -133,22 +133,51 @@ ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Settings & settings settings.http_receive_timeout); } -ConnectionTimeouts ConnectionTimeouts::aggressiveTimeouts(UInt32 attempt) const +ConnectionTimeouts ConnectionTimeouts::getAdaptiveTimeouts(Aws::Http::HttpMethod method, UInt32 attempt) const { + constexpr size_t first_method_index = size_t(Aws::Http::HttpMethod::HTTP_GET); + constexpr size_t last_method_index = size_t(Aws::Http::HttpMethod::HTTP_PATCH); + constexpr size_t methods_count = last_method_index - first_method_index + 1; + + /// HTTP_POST is used for CompleteMultipartUpload requests. + /// These requests need longer timeout, especially when minio is used + /// The same assumption are made for HTTP_DELETE, HTTP_PATCH + /// That requests are more heavy that HTTP_GET, HTTP_HEAD, HTTP_PUT + + static const UInt32 first_attempt_send_receive_timeouts_ms[methods_count][2] = { + /*HTTP_GET*/ {200, 200}, + /*HTTP_POST*/ {200, 30000}, + /*HTTP_DELETE*/ {200, 1000}, + /*HTTP_PUT*/ {200, 200}, + /*HTTP_HEAD*/ {200, 200}, + /*HTTP_PATCH*/ {200, 1000}, + }; + + static const UInt32 second_attempt_send_receive_timeouts_ms[methods_count][2] = { + /*HTTP_GET*/ {1000, 1000}, + /*HTTP_POST*/ {1000, 30000}, + /*HTTP_DELETE*/ {1000, 10000}, + /*HTTP_PUT*/ {1000, 1000}, + /*HTTP_HEAD*/ {1000, 1000}, + /*HTTP_PATCH*/ {1000, 10000}, + }; + + static_assert(methods_count == 6); + static_assert(sizeof(first_attempt_send_receive_timeouts_ms) == sizeof(second_attempt_send_receive_timeouts_ms)); + static_assert(sizeof(first_attempt_send_receive_timeouts_ms) == methods_count * sizeof(UInt32) * 2); + auto aggressive = *this; + if (attempt > 2) + return aggressive; + + auto timeout_map = first_attempt_send_receive_timeouts_ms; if (attempt == 2) - { - auto one_second = Poco::Timespan(1, 0); - aggressive.send_timeout = saturate(one_second, send_timeout); - aggressive.receive_timeout = saturate(one_second, receive_timeout); - } - else if (attempt == 1) - { - auto two_hundred_ms = Poco::Timespan(0, 200 * 1000); - aggressive.send_timeout = saturate(two_hundred_ms, send_timeout); - aggressive.receive_timeout = saturate(two_hundred_ms, receive_timeout); - } + timeout_map = second_attempt_send_receive_timeouts_ms; + + const size_t method_index = size_t(method) - first_method_index; + aggressive.send_timeout = saturate(Poco::Timespan(timeout_map[method_index][0]), send_timeout); + aggressive.receive_timeout = saturate(Poco::Timespan(timeout_map[method_index][1]), receive_timeout); return aggressive; } diff --git a/src/IO/ConnectionTimeouts.h b/src/IO/ConnectionTimeouts.h index 17ee1907d89..0ef133c8378 100644 --- a/src/IO/ConnectionTimeouts.h +++ b/src/IO/ConnectionTimeouts.h @@ -4,6 +4,7 @@ #include #include +#include namespace DB { @@ -68,7 +69,7 @@ struct ConnectionTimeouts static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings); static ConnectionTimeouts getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout); - ConnectionTimeouts aggressiveTimeouts(UInt32 attempt) const; + ConnectionTimeouts getAdaptiveTimeouts(Aws::Http::HttpMethod method, UInt32 attempt) const; }; } diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 90806852c1e..4630e68fbb6 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -118,19 +118,9 @@ std::unique_ptr Client::create( new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, use_virtual_addressing)); } -std::unique_ptr Client::clone( - std::optional override_use_adaptive_timeouts, - std::optional override_request_timeout_ms) const +std::unique_ptr Client::clone() const { - PocoHTTPClientConfiguration new_configuration = client_configuration; - - if (override_request_timeout_ms.has_value()) - new_configuration.requestTimeoutMs = *override_request_timeout_ms; - - if (override_use_adaptive_timeouts.has_value()) - new_configuration.s3_use_adaptive_timeouts = *override_use_adaptive_timeouts; - - return std::unique_ptr(new Client(*this, new_configuration)); + return std::unique_ptr(new Client(*this, client_configuration)); } namespace diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index be7235eb9f1..5ad57a9d827 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -118,13 +118,7 @@ public: Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads, bool use_virtual_addressing); - /// Create a client with adjusted settings: - /// * override_request_timeout_ms is used to increase timeout for CompleteMultipartUploadRequest - /// because it often sits idle for 10 seconds: https://github.com/ClickHouse/ClickHouse/pull/42321 - /// * s3_use_adaptive_timeouts is used to turn off s3_use_adaptive_timeouts feature for CompleteMultipartUploadRequest - std::unique_ptr clone( - std::optional override_use_adaptive_timeouts = std::nullopt, - std::optional override_request_timeout_ms = std::nullopt) const; + std::unique_ptr clone() const; Client & operator=(const Client &) = delete; diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index f783a886877..b26c36f8029 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -300,7 +300,7 @@ ConnectionTimeouts PocoHTTPClient::getTimeouts(Aws::Http::HttpRequest & request) const auto & request_info = request.GetHeaderValue(Aws::Http::SDK_REQUEST_HEADER); auto attempt = extractAttempt(request_info); - return timeouts.aggressiveTimeouts(attempt); + return timeouts.getAdaptiveTimeouts(request.GetMethod(), attempt); } void PocoHTTPClient::makeRequestInternal( diff --git a/src/IO/S3/copyS3File.cpp b/src/IO/S3/copyS3File.cpp index a16a1a41505..30da1c580c1 100644 --- a/src/IO/S3/copyS3File.cpp +++ b/src/IO/S3/copyS3File.cpp @@ -53,7 +53,6 @@ namespace public: UploadHelper( const std::shared_ptr & client_ptr_, - const std::shared_ptr & client_with_long_timeout_ptr_, const String & dest_bucket_, const String & dest_key_, const S3Settings::RequestSettings & request_settings_, @@ -62,7 +61,6 @@ namespace bool for_disk_s3_, const Poco::Logger * log_) : client_ptr(client_ptr_) - , client_with_long_timeout_ptr(client_with_long_timeout_ptr_) , dest_bucket(dest_bucket_) , dest_key(dest_key_) , request_settings(request_settings_) @@ -78,7 +76,6 @@ namespace protected: std::shared_ptr client_ptr; - std::shared_ptr client_with_long_timeout_ptr; const String & dest_bucket; const String & dest_key; const S3Settings::RequestSettings & request_settings; @@ -179,7 +176,7 @@ namespace if (for_disk_s3) ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload); - auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(request); + auto outcome = client_ptr->CompleteMultipartUpload(request); if (outcome.IsSuccess()) { @@ -433,14 +430,13 @@ namespace size_t offset_, size_t size_, const std::shared_ptr & client_ptr_, - const std::shared_ptr & client_with_long_timeout_ptr_, const String & dest_bucket_, const String & dest_key_, const S3Settings::RequestSettings & request_settings_, const std::optional> & object_metadata_, ThreadPoolCallbackRunner schedule_, bool for_disk_s3_) - : UploadHelper(client_ptr_, client_with_long_timeout_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File")) + : UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File")) , create_read_buffer(create_read_buffer_) , offset(offset_) , size(size_) @@ -602,7 +598,6 @@ namespace public: CopyFileHelper( const std::shared_ptr & client_ptr_, - const std::shared_ptr & client_with_long_timeout_ptr_, const String & src_bucket_, const String & src_key_, size_t src_offset_, @@ -614,7 +609,7 @@ namespace const std::optional> & object_metadata_, ThreadPoolCallbackRunner schedule_, bool for_disk_s3_) - : UploadHelper(client_ptr_, client_with_long_timeout_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File")) + : UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File")) , src_bucket(src_bucket_) , src_key(src_key_) , offset(src_offset_) @@ -677,7 +672,7 @@ namespace /// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840 request.SetContentType("binary/octet-stream"); - client_with_long_timeout_ptr->setKMSHeaders(request); + client_ptr->setKMSHeaders(request); } void processCopyRequest(const S3::CopyObjectRequest & request) @@ -689,7 +684,7 @@ namespace if (for_disk_s3) ProfileEvents::increment(ProfileEvents::DiskS3CopyObject); - auto outcome = client_with_long_timeout_ptr->CopyObject(request); + auto outcome = client_ptr->CopyObject(request); if (outcome.IsSuccess()) { LOG_TRACE( @@ -714,7 +709,6 @@ namespace offset, size, client_ptr, - client_with_long_timeout_ptr, dest_bucket, dest_key, request_settings, @@ -788,7 +782,7 @@ namespace if (for_disk_s3) ProfileEvents::increment(ProfileEvents::DiskS3UploadPartCopy); - auto outcome = client_with_long_timeout_ptr->UploadPartCopy(req); + auto outcome = client_ptr->UploadPartCopy(req); if (!outcome.IsSuccess()) { abortMultipartUpload(); @@ -806,7 +800,6 @@ void copyDataToS3File( size_t offset, size_t size, const std::shared_ptr & dest_s3_client, - const std::shared_ptr & dest_s3_client_with_long_timeout, const String & dest_bucket, const String & dest_key, const S3Settings::RequestSettings & settings, @@ -814,14 +807,13 @@ void copyDataToS3File( ThreadPoolCallbackRunner schedule, bool for_disk_s3) { - CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3}; + CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3}; helper.performCopy(); } void copyS3File( const std::shared_ptr & s3_client, - const std::shared_ptr & s3_client_with_long_timeout, const String & src_bucket, const String & src_key, size_t src_offset, @@ -836,7 +828,7 @@ void copyS3File( { if (settings.allow_native_copy) { - CopyFileHelper helper{s3_client, s3_client_with_long_timeout, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3}; + CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3}; helper.performCopy(); } else @@ -845,7 +837,7 @@ void copyS3File( { return std::make_unique(s3_client, src_bucket, src_key, "", settings, read_settings); }; - copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3); + copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3); } } diff --git a/src/IO/S3/copyS3File.h b/src/IO/S3/copyS3File.h index 1bcbfd7735e..33e22fdfba2 100644 --- a/src/IO/S3/copyS3File.h +++ b/src/IO/S3/copyS3File.h @@ -27,15 +27,9 @@ using CreateReadBuffer = std::function()>; /// because it is a known issue, it is fallbacks to read-write copy /// (copyDataToS3File()). /// -/// s3_client_with_long_timeout (may be equal to s3_client) is used for native copy and -/// CompleteMultipartUpload requests. These requests need longer timeout because S3 servers often -/// block on them for multiple seconds without sending or receiving data from us (maybe the servers -/// are copying data internally, or maybe throttling, idk). -/// /// read_settings - is used for throttling in case of native copy is not possible void copyS3File( const std::shared_ptr & s3_client, - const std::shared_ptr & s3_client_with_long_timeout, const String & src_bucket, const String & src_key, size_t src_offset, @@ -58,7 +52,6 @@ void copyDataToS3File( size_t offset, size_t size, const std::shared_ptr & dest_s3_client, - const std::shared_ptr & dest_s3_client_with_long_timeout, const String & dest_bucket, const String & dest_key, const S3Settings::RequestSettings & settings, diff --git a/src/IO/S3/tests/gtest_aws_s3_client.cpp b/src/IO/S3/tests/gtest_aws_s3_client.cpp index c42f14e9a53..d4b9a017398 100644 --- a/src/IO/S3/tests/gtest_aws_s3_client.cpp +++ b/src/IO/S3/tests/gtest_aws_s3_client.cpp @@ -91,7 +91,6 @@ void doWriteRequest(std::shared_ptr client, const DB::S3:: DB::S3Settings::RequestSettings request_settings; request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries; DB::WriteBufferFromS3 write_buffer( - client, client, uri.bucket, uri.key, diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index e1b9c17efe9..62d0c80f1f2 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -77,7 +77,6 @@ struct WriteBufferFromS3::PartData WriteBufferFromS3::WriteBufferFromS3( std::shared_ptr client_ptr_, - std::shared_ptr client_with_long_timeout_ptr_, const String & bucket_, const String & key_, size_t buf_size_, @@ -92,7 +91,6 @@ WriteBufferFromS3::WriteBufferFromS3( , upload_settings(request_settings.getUploadSettings()) , write_settings(write_settings_) , client_ptr(std::move(client_ptr_)) - , client_with_long_timeout_ptr(std::move(client_with_long_timeout_ptr_)) , object_metadata(std::move(object_metadata_)) , buffer_allocation_policy(ChooseBufferPolicy(upload_settings)) , task_tracker( @@ -566,7 +564,7 @@ void WriteBufferFromS3::completeMultipartUpload() ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload); Stopwatch watch; - auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(req); + auto outcome = client_ptr->CompleteMultipartUpload(req); watch.stop(); ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index 95148c49779..590342cc997 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -30,8 +30,6 @@ class WriteBufferFromS3 final : public WriteBufferFromFileBase public: WriteBufferFromS3( std::shared_ptr client_ptr_, - /// for CompleteMultipartUploadRequest, because it blocks on recv() for a few seconds on big uploads - std::shared_ptr client_with_long_timeout_ptr_, const String & bucket_, const String & key_, size_t buf_size_, @@ -90,7 +88,6 @@ private: const S3Settings::RequestSettings::PartUploadSettings & upload_settings; const WriteSettings write_settings; const std::shared_ptr client_ptr; - const std::shared_ptr client_with_long_timeout_ptr; const std::optional> object_metadata; Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3"); LogSeriesLimiterPtr limitedLog = std::make_shared(log, 1, 5); diff --git a/src/IO/tests/gtest_writebuffer_s3.cpp b/src/IO/tests/gtest_writebuffer_s3.cpp index 21bdd9a6f26..c82f97f8b20 100644 --- a/src/IO/tests/gtest_writebuffer_s3.cpp +++ b/src/IO/tests/gtest_writebuffer_s3.cpp @@ -549,7 +549,6 @@ public: getAsyncPolicy().setAutoExecute(false); return std::make_unique( - client, client, bucket, file_name, diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 231efb87e87..b0cd40a2e05 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -824,7 +824,6 @@ public: write_buf = wrapWriteBufferWithCompressionMethod( std::make_unique( configuration_.client, - configuration_.client_with_long_timeout, bucket, key, DBMS_DEFAULT_BUFFER_SIZE, @@ -1329,8 +1328,6 @@ void StorageS3::Configuration::connect(ContextPtr context) context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)), auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)), }); - - client_with_long_timeout = client->clone(/*override_use_adaptive_timeouts*/ false, request_settings.long_request_timeout_ms); } void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection) diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 3330ac6c210..3f35c578e19 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -311,7 +311,6 @@ public: HTTPHeaderEntries headers_from_ast; std::shared_ptr client; - std::shared_ptr client_with_long_timeout; std::vector keys; }; diff --git a/src/Storages/StorageS3Settings.h b/src/Storages/StorageS3Settings.h index e3d577ca0b3..728972c948c 100644 --- a/src/Storages/StorageS3Settings.h +++ b/src/Storages/StorageS3Settings.h @@ -69,8 +69,7 @@ struct S3Settings ThrottlerPtr get_request_throttler; ThrottlerPtr put_request_throttler; size_t retry_attempts = 10; - size_t request_timeout_ms = 3000; - size_t long_request_timeout_ms = 30000; // TODO: Take this from config like request_timeout_ms + size_t request_timeout_ms = 30000; bool allow_native_copy = true; bool throw_on_zero_files_match = false; From 3075bd97450d42f00320b18c1b177fd700a19bec Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Fri, 10 Nov 2023 15:15:24 +0100 Subject: [PATCH 12/19] track clickhouse high level retries --- base/poco/Net/src/HTTPSession.cpp | 4 +- src/IO/ConnectionTimeouts.cpp | 101 ++++++++++++------ src/IO/ConnectionTimeouts.h | 3 +- src/IO/HTTPCommon.cpp | 12 +-- src/IO/HTTPCommon.h | 2 + src/IO/ReadBufferFromS3.cpp | 24 +++-- src/IO/ReadBufferFromS3.h | 4 +- src/IO/S3/PocoHTTPClient.cpp | 89 ++++++++------- src/IO/S3/PocoHTTPClient.h | 2 +- src/IO/S3/tests/gtest_aws_s3_client.cpp | 2 + .../configs/inf_s3_retries.xml | 2 +- .../configs/s3_retries.xml | 2 +- .../configs/storage_conf.xml | 1 + .../test_checking_s3_blobs_paranoid/test.py | 16 +-- .../test_storage_s3/configs/s3_retry.xml | 1 + .../s3_mocks/unstable_server.py | 17 ++- tests/integration/test_storage_s3/test.py | 9 ++ 17 files changed, 181 insertions(+), 110 deletions(-) diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index d30f5590280..9ebbd7d04cd 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -95,7 +95,7 @@ void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco { _connectionTimeout = connectionTimeout; - if (_sendTimeout != sendTimeout) + if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) { _sendTimeout = sendTimeout; @@ -103,7 +103,7 @@ void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco _socket.setSendTimeout(_sendTimeout); } - if (_receiveTimeout != receiveTimeout) + if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) { _receiveTimeout = receiveTimeout; diff --git a/src/IO/ConnectionTimeouts.cpp b/src/IO/ConnectionTimeouts.cpp index 90406dcf409..970afc75ec3 100644 --- a/src/IO/ConnectionTimeouts.cpp +++ b/src/IO/ConnectionTimeouts.cpp @@ -133,51 +133,84 @@ ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Settings & settings settings.http_receive_timeout); } -ConnectionTimeouts ConnectionTimeouts::getAdaptiveTimeouts(Aws::Http::HttpMethod method, UInt32 attempt) const +class SendReceiveTimeoutsForFirstAttempt { - constexpr size_t first_method_index = size_t(Aws::Http::HttpMethod::HTTP_GET); - constexpr size_t last_method_index = size_t(Aws::Http::HttpMethod::HTTP_PATCH); - constexpr size_t methods_count = last_method_index - first_method_index + 1; +private: + static constexpr size_t known_methods_count = 6; + using KnownMethodsArray = std::array; + static const KnownMethodsArray known_methods; - /// HTTP_POST is used for CompleteMultipartUpload requests. - /// These requests need longer timeout, especially when minio is used + /// HTTP_POST is used for CompleteMultipartUpload requests. Its latency could be high. + /// These requests need longer timeout, especially when minio is used. /// The same assumption are made for HTTP_DELETE, HTTP_PATCH /// That requests are more heavy that HTTP_GET, HTTP_HEAD, HTTP_PUT - static const UInt32 first_attempt_send_receive_timeouts_ms[methods_count][2] = { - /*HTTP_GET*/ {200, 200}, - /*HTTP_POST*/ {200, 30000}, - /*HTTP_DELETE*/ {200, 1000}, - /*HTTP_PUT*/ {200, 200}, - /*HTTP_HEAD*/ {200, 200}, - /*HTTP_PATCH*/ {200, 1000}, + static constexpr Poco::Timestamp::TimeDiff first_byte_ms[known_methods_count][2] = + { + /* GET */ {200, 200}, + /* POST */ {200, 200}, + /* DELETE */ {200, 200}, + /* PUT */ {200, 200}, + /* HEAD */ {200, 200}, + /* PATCH */ {200, 200}, }; - static const UInt32 second_attempt_send_receive_timeouts_ms[methods_count][2] = { - /*HTTP_GET*/ {1000, 1000}, - /*HTTP_POST*/ {1000, 30000}, - /*HTTP_DELETE*/ {1000, 10000}, - /*HTTP_PUT*/ {1000, 1000}, - /*HTTP_HEAD*/ {1000, 1000}, - /*HTTP_PATCH*/ {1000, 10000}, + static constexpr Poco::Timestamp::TimeDiff rest_bytes_ms[known_methods_count][2] = + { + /* GET */ {500, 500}, + /* POST */ {1000, 30000}, + /* DELETE */ {1000, 10000}, + /* PUT */ {1000, 3000}, + /* HEAD */ {500, 500}, + /* PATCH */ {1000, 10000}, }; - static_assert(methods_count == 6); - static_assert(sizeof(first_attempt_send_receive_timeouts_ms) == sizeof(second_attempt_send_receive_timeouts_ms)); - static_assert(sizeof(first_attempt_send_receive_timeouts_ms) == methods_count * sizeof(UInt32) * 2); + static_assert(sizeof(first_byte_ms) == sizeof(rest_bytes_ms)); + static_assert(sizeof(first_byte_ms) == known_methods_count * sizeof(Poco::Timestamp::TimeDiff) * 2); + + static size_t getMethodIndex(const String & method) + { + KnownMethodsArray::const_iterator it = std::find(known_methods.begin(), known_methods.end(), method); + chassert(it != known_methods.end()); + if (it == known_methods.end()) + return 0; + return std::distance(known_methods.begin(), it); + } + +public: + static std::pair getSendReceiveTimeout(const String & method, bool first_byte) + { + auto idx = getMethodIndex(method); + + if (first_byte) + return std::make_pair( + Poco::Timespan(first_byte_ms[idx][0] * 1000), + Poco::Timespan(first_byte_ms[idx][1] * 1000) + ); + + return std::make_pair( + Poco::Timespan(rest_bytes_ms[idx][0] * 1000), + Poco::Timespan(rest_bytes_ms[idx][1] * 1000) + ); + } +}; + +const SendReceiveTimeoutsForFirstAttempt::KnownMethodsArray SendReceiveTimeoutsForFirstAttempt::known_methods = +{ + "GET", "POST", "DELETE", "PUT", "HEAD", "PATCH" +}; + + +ConnectionTimeouts ConnectionTimeouts::getAdaptiveTimeouts(const String & method, bool first_attempt, bool first_byte) const +{ + if (!first_attempt) + return *this; + + auto [send, recv] = SendReceiveTimeoutsForFirstAttempt::getSendReceiveTimeout(method, first_byte); auto aggressive = *this; - - if (attempt > 2) - return aggressive; - - auto timeout_map = first_attempt_send_receive_timeouts_ms; - if (attempt == 2) - timeout_map = second_attempt_send_receive_timeouts_ms; - - const size_t method_index = size_t(method) - first_method_index; - aggressive.send_timeout = saturate(Poco::Timespan(timeout_map[method_index][0]), send_timeout); - aggressive.receive_timeout = saturate(Poco::Timespan(timeout_map[method_index][1]), receive_timeout); + aggressive.send_timeout = saturate(send, send_timeout); + aggressive.receive_timeout = saturate(recv, receive_timeout); return aggressive; } diff --git a/src/IO/ConnectionTimeouts.h b/src/IO/ConnectionTimeouts.h index 0ef133c8378..aabebdb836d 100644 --- a/src/IO/ConnectionTimeouts.h +++ b/src/IO/ConnectionTimeouts.h @@ -4,7 +4,6 @@ #include #include -#include namespace DB { @@ -69,7 +68,7 @@ struct ConnectionTimeouts static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings); static ConnectionTimeouts getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout); - ConnectionTimeouts getAdaptiveTimeouts(Aws::Http::HttpMethod method, UInt32 attempt) const; + ConnectionTimeouts getAdaptiveTimeouts(const String & method, bool first_attempt, bool first_byte) const; }; } diff --git a/src/IO/HTTPCommon.cpp b/src/IO/HTTPCommon.cpp index 65ffa51a466..cce394c67c9 100644 --- a/src/IO/HTTPCommon.cpp +++ b/src/IO/HTTPCommon.cpp @@ -50,12 +50,6 @@ namespace ErrorCodes namespace { - void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts) - { - session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout); - session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout); - } - Poco::Net::HTTPClientSession::ProxyConfig proxyConfigurationToPocoProxyConfig(const ProxyConfiguration & proxy_configuration) { Poco::Net::HTTPClientSession::ProxyConfig poco_proxy_config; @@ -359,6 +353,12 @@ namespace }; } +void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts) +{ + session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout); + session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout); +} + void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout) { if (!response.getKeepAlive()) diff --git a/src/IO/HTTPCommon.h b/src/IO/HTTPCommon.h index de62b5d5c16..c9968fc6915 100644 --- a/src/IO/HTTPCommon.h +++ b/src/IO/HTTPCommon.h @@ -113,4 +113,6 @@ std::istream * receiveResponse( void assertResponseIsOk( const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, bool allow_redirects = false); + +void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts); } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index f19978ccb47..c9c9319c44c 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -167,9 +167,9 @@ bool ReadBufferFromS3::nextImpl() } size_t sleep_time_with_backoff_milliseconds = 100; - for (size_t attempt = 0; !next_result; ++attempt) + for (size_t attempt = 1; !next_result; ++attempt) { - bool last_attempt = attempt + 1 >= request_settings.max_single_read_retries; + bool last_attempt = attempt >= request_settings.max_single_read_retries; ProfileEventTimeIncrement watch(ProfileEvents::ReadBufferFromS3Microseconds); @@ -177,7 +177,7 @@ bool ReadBufferFromS3::nextImpl() { if (!impl) { - impl = initialize(); + impl = initialize(attempt); if (use_external_buffer) { @@ -232,9 +232,9 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons { size_t initial_n = n; size_t sleep_time_with_backoff_milliseconds = 100; - for (size_t attempt = 0; n > 0; ++attempt) + for (size_t attempt = 1; n > 0; ++attempt) { - bool last_attempt = attempt + 1 >= request_settings.max_single_read_retries; + bool last_attempt = attempt >= request_settings.max_single_read_retries; size_t bytes_copied = 0; ProfileEventTimeIncrement watch(ProfileEvents::ReadBufferFromS3Microseconds); @@ -266,7 +266,7 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons try { - result = sendRequest(range_begin, range_begin + n - 1); + result = sendRequest(attempt, range_begin, range_begin + n - 1); std::istream & istr = result->GetBody(); copyFromIStreamWithProgressCallback(istr, to, n, progress_callback, &bytes_copied); @@ -304,8 +304,8 @@ bool ReadBufferFromS3::processException(Poco::Exception & e, size_t read_offset, LOG_DEBUG( log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, " - "Attempt: {}, Message: {}", - bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, e.message()); + "Attempt: {}/{}, Message: {}", + bucket, key, version_id.empty() ? "Latest" : version_id, read_offset, attempt, request_settings.max_single_read_retries, e.message()); if (auto * s3_exception = dynamic_cast(&e)) @@ -463,7 +463,7 @@ ReadBufferFromS3::~ReadBufferFromS3() } } -std::unique_ptr ReadBufferFromS3::initialize() +std::unique_ptr ReadBufferFromS3::initialize(size_t attempt) { resetSessionIfNeeded(readAllRangeSuccessfully(), read_result); read_all_range_successfully = false; @@ -475,13 +475,13 @@ std::unique_ptr ReadBufferFromS3::initialize() if (read_until_position && offset >= read_until_position) throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1); - read_result = sendRequest(offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt); + read_result = sendRequest(attempt, offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt); size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size; return std::make_unique(read_result->GetBody(), buffer_size); } -Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin, std::optional range_end_incl) const +Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, size_t range_begin, std::optional range_end_incl) const { S3::GetObjectRequest req; req.SetBucket(bucket); @@ -489,6 +489,8 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin if (!version_id.empty()) req.SetVersionId(version_id); + req.SetAdditionalCustomHeaderValue("clickhouse-request", fmt::format("attempt={}", attempt)); + if (range_end_incl) { req.SetRange(fmt::format("bytes={}-{}", range_begin, *range_end_incl)); diff --git a/src/IO/ReadBufferFromS3.h b/src/IO/ReadBufferFromS3.h index 0835e52a5b2..101e25f8b43 100644 --- a/src/IO/ReadBufferFromS3.h +++ b/src/IO/ReadBufferFromS3.h @@ -79,7 +79,7 @@ public: bool supportsReadAt() override { return true; } private: - std::unique_ptr initialize(); + std::unique_ptr initialize(size_t attempt); /// If true, if we destroy impl now, no work was wasted. Just for metrics. bool atEndOfRequestedRangeGuess(); @@ -88,7 +88,7 @@ private: /// Returns true if the error looks retriable. bool processException(Poco::Exception & e, size_t read_offset, size_t attempt) const; - Aws::S3::Model::GetObjectResult sendRequest(size_t range_begin, std::optional range_end_incl) const; + Aws::S3::Model::GetObjectResult sendRequest(size_t attempt, size_t range_begin, std::optional range_end_incl) const; bool readAllRangeSuccessfully() const; diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index b26c36f8029..904e2324145 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -272,35 +272,36 @@ void PocoHTTPClient::addMetric(const Aws::Http::HttpRequest & request, S3MetricT ProfileEvents::increment(disk_s3_events_map[static_cast(type)][static_cast(kind)], amount); } -UInt32 extractAttempt(const Aws::String & request_info) +String extractAttemptFromInfo(const Aws::String & request_info) { static auto key = Aws::String("attempt="); auto key_begin = request_info.find(key, 0); if (key_begin == Aws::String::npos) - return 1; + return "1"; auto val_begin = key_begin + key.size(); auto val_end = request_info.find(';', val_begin); if (val_end == Aws::String::npos) val_end = request_info.size(); - Aws::String value = request_info.substr(val_begin, val_end-val_begin); - - UInt32 attempt = 1; - ReadBufferFromString buf(value); - readIntText(attempt, buf); - return attempt; + return request_info.substr(val_begin, val_end-val_begin); } -ConnectionTimeouts PocoHTTPClient::getTimeouts(Aws::Http::HttpRequest & request) const +String getOrEmpty(const Aws::Http::HeaderValueCollection & map, const String & key) +{ + auto it = map.find(key); + if (it == map.end()) + return {}; + return it->second; +} + +ConnectionTimeouts PocoHTTPClient::getTimeouts(const String & method, bool first_attempt, bool first_byte) const { if (!s3_use_adaptive_timeouts) return timeouts; - const auto & request_info = request.GetHeaderValue(Aws::Http::SDK_REQUEST_HEADER); - auto attempt = extractAttempt(request_info); - return timeouts.getAdaptiveTimeouts(request.GetMethod(), attempt); + return timeouts.getAdaptiveTimeouts(method, first_attempt, first_byte); } void PocoHTTPClient::makeRequestInternal( @@ -317,6 +318,25 @@ void PocoHTTPClient::makeRequestInternal( makeRequestInternalImpl(request, request_configuration, response, readLimiter, writeLimiter); } +String getMethod(const Aws::Http::HttpRequest & request) +{ + switch (request.GetMethod()) + { + case Aws::Http::HttpMethod::HTTP_GET: + return Poco::Net::HTTPRequest::HTTP_GET; + case Aws::Http::HttpMethod::HTTP_POST: + return Poco::Net::HTTPRequest::HTTP_POST; + case Aws::Http::HttpMethod::HTTP_DELETE: + return Poco::Net::HTTPRequest::HTTP_DELETE; + case Aws::Http::HttpMethod::HTTP_PUT: + return Poco::Net::HTTPRequest::HTTP_PUT; + case Aws::Http::HttpMethod::HTTP_HEAD: + return Poco::Net::HTTPRequest::HTTP_HEAD; + case Aws::Http::HttpMethod::HTTP_PATCH: + return Poco::Net::HTTPRequest::HTTP_PATCH; + } +} + template void PocoHTTPClient::makeRequestInternalImpl( Aws::Http::HttpRequest & request, @@ -330,9 +350,14 @@ void PocoHTTPClient::makeRequestInternalImpl( Poco::Logger * log = &Poco::Logger::get("AWSClient"); auto uri = request.GetUri().GetURIString(); + auto method = getMethod(request); + + auto sdk_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), Aws::Http::SDK_REQUEST_HEADER)); + auto ch_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), "clickhouse-request")); + bool first_attempt = ch_attempt == "1" && sdk_attempt == "1"; if (enable_s3_requests_logging) - LOG_TEST(log, "Make request to: {}", uri); + LOG_TEST(log, "Make request to: {}, aws sdk attempt: {}, clickhouse attempt: {}", uri, sdk_attempt, ch_attempt); switch (request.GetMethod()) { @@ -383,17 +408,17 @@ void PocoHTTPClient::makeRequestInternalImpl( /// This can lead to request signature difference on S3 side. if constexpr (pooled) session = makePooledHTTPSession( - target_uri, getTimeouts(request), http_connection_pool_size, wait_on_pool_size_limit, proxy_configuration); + target_uri, getTimeouts(method, first_attempt), http_connection_pool_size, wait_on_pool_size_limit, proxy_configuration); else - session = makeHTTPSession(target_uri, getTimeouts(request), proxy_configuration); + session = makeHTTPSession(target_uri, getTimeouts(method, first_attempt), proxy_configuration); } else { if constexpr (pooled) session = makePooledHTTPSession( - target_uri, getTimeouts(request), http_connection_pool_size, wait_on_pool_size_limit); + target_uri, getTimeouts(method, first_attempt), http_connection_pool_size, wait_on_pool_size_limit); else - session = makeHTTPSession(target_uri, getTimeouts(request)); + session = makeHTTPSession(target_uri, getTimeouts(method, first_attempt)); } /// In case of error this address will be written to logs @@ -427,28 +452,7 @@ void PocoHTTPClient::makeRequestInternalImpl( path_and_query = "/"; poco_request.setURI(path_and_query); - - switch (request.GetMethod()) - { - case Aws::Http::HttpMethod::HTTP_GET: - poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_GET); - break; - case Aws::Http::HttpMethod::HTTP_POST: - poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_POST); - break; - case Aws::Http::HttpMethod::HTTP_DELETE: - poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_DELETE); - break; - case Aws::Http::HttpMethod::HTTP_PUT: - poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PUT); - break; - case Aws::Http::HttpMethod::HTTP_HEAD: - poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_HEAD); - break; - case Aws::Http::HttpMethod::HTTP_PATCH: - poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PATCH); - break; - } + poco_request.setMethod(method); /// Headers coming from SDK are lower-cased. for (const auto & [header_name, header_value] : request.GetHeaders()) @@ -473,6 +477,7 @@ void PocoHTTPClient::makeRequestInternalImpl( request.GetContentBody()->clear(); request.GetContentBody()->seekg(0); + setTimeouts(*session, getTimeouts(method, first_attempt, /*first_byte*/ false)); auto size = Poco::StreamCopier::copyStream(*request.GetContentBody(), request_body_stream); if (enable_s3_requests_logging) LOG_TEST(log, "Written {} bytes to request body", size); @@ -482,6 +487,8 @@ void PocoHTTPClient::makeRequestInternalImpl( LOG_TEST(log, "Receiving response..."); auto & response_body_stream = session->receiveResponse(poco_response); + setTimeouts(*session, getTimeouts(method, first_attempt, /*first_byte*/ false)); + watch.stop(); addMetric(request, S3MetricType::Microseconds, watch.elapsedMicroseconds()); @@ -533,6 +540,7 @@ void PocoHTTPClient::makeRequestInternalImpl( /// Request is successful but for some special requests we can have actual error message in body if (status_code >= SUCCESS_RESPONSE_MIN && status_code <= SUCCESS_RESPONSE_MAX && checkRequestCanReturn2xxAndErrorInBody(request)) { + /// reading the full response std::string response_string((std::istreambuf_iterator(response_body_stream)), std::istreambuf_iterator()); @@ -547,7 +555,6 @@ void PocoHTTPClient::makeRequestInternalImpl( addMetric(request, S3MetricType::Errors); if (error_report) error_report(proxy_configuration); - } /// Set response from string @@ -566,6 +573,8 @@ void PocoHTTPClient::makeRequestInternalImpl( if (status_code >= 500 && error_report) error_report(proxy_configuration); } + + /// expose stream, after that client reads data from that stream without built-in retries response->SetResponseBody(response_body_stream, session); } diff --git a/src/IO/S3/PocoHTTPClient.h b/src/IO/S3/PocoHTTPClient.h index 9ba5f4ffe64..14c4fec5dd7 100644 --- a/src/IO/S3/PocoHTTPClient.h +++ b/src/IO/S3/PocoHTTPClient.h @@ -171,7 +171,7 @@ private: Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const; - ConnectionTimeouts getTimeouts(Aws::Http::HttpRequest & request) const; + ConnectionTimeouts getTimeouts(const String & method, bool first_attempt, bool first_byte = true) const; protected: static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request); diff --git a/src/IO/S3/tests/gtest_aws_s3_client.cpp b/src/IO/S3/tests/gtest_aws_s3_client.cpp index d4b9a017398..bff9ca6fa7b 100644 --- a/src/IO/S3/tests/gtest_aws_s3_client.cpp +++ b/src/IO/S3/tests/gtest_aws_s3_client.cpp @@ -170,6 +170,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeadersRead) "authorization: ... SignedHeaders=" "amz-sdk-invocation-id;" "amz-sdk-request;" + "clickhouse-request;" "content-type;" "host;" "x-amz-api-version;" @@ -215,6 +216,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSEKMSHeadersRead) "authorization: ... SignedHeaders=" "amz-sdk-invocation-id;" "amz-sdk-request;" + "clickhouse-request;" "content-type;" "host;" "x-amz-api-version;" diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/inf_s3_retries.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/inf_s3_retries.xml index 5f0860ac120..4210c13b727 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/configs/inf_s3_retries.xml +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/inf_s3_retries.xml @@ -4,7 +4,7 @@ 1000000 - 1 + 1 diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/s3_retries.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/s3_retries.xml index f215a89f613..95a313ea4f2 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/configs/s3_retries.xml +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/s3_retries.xml @@ -4,7 +4,7 @@ 5 - 0 + 0 diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/storage_conf.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/storage_conf.xml index 264c411b59b..7b1f503ed55 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/configs/storage_conf.xml +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/storage_conf.xml @@ -18,6 +18,7 @@ http://resolver:8083/root/data/ minio minio123 + 1 diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 441a5a541e8..b000ccabcf4 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -556,7 +556,7 @@ def test_query_is_canceled_with_inf_retries(cluster, broken_s3): @pytest.mark.parametrize("node_name", ["node", "node_with_inf_s3_retries"]) -def test_aggressive_timeouts(cluster, broken_s3, node_name): +def test_adaptive_timeouts(cluster, broken_s3, node_name): node = cluster.instances[node_name] broken_s3.setup_fake_puts(part_length=1) @@ -565,12 +565,12 @@ def test_aggressive_timeouts(cluster, broken_s3, node_name): count=1000000, ) - insert_query_id = f"TEST_AGGRESSIVE_TIMEOUTS_{node_name}" + insert_query_id = f"TEST_ADAPTIVE_TIMEOUTS_{node_name}" node.query( f""" INSERT INTO TABLE FUNCTION s3( - 'http://resolver:8083/root/data/aggressive_timeouts', + 'http://resolver:8083/root/data/adaptive_timeouts', 'minio', 'minio123', 'CSV', auto, 'none' ) @@ -593,20 +593,20 @@ def test_aggressive_timeouts(cluster, broken_s3, node_name): assert put_objects == 1 - s3_aggressive_timeouts_state = node.query( + s3_use_adaptive_timeouts = node.query( f""" SELECT value FROM system.settings WHERE - name='s3_aggressive_timeouts' + name='s3_use_adaptive_timeouts' """ ).strip() if node_name == "node_with_inf_s3_retries": # first 2 attempts failed - assert s3_aggressive_timeouts_state == "1" - assert s3_errors == 2 + assert s3_use_adaptive_timeouts == "1" + assert s3_errors == 1 else: - assert s3_aggressive_timeouts_state == "0" + assert s3_use_adaptive_timeouts == "0" assert s3_errors == 0 diff --git a/tests/integration/test_storage_s3/configs/s3_retry.xml b/tests/integration/test_storage_s3/configs/s3_retry.xml index b7a7bbc8a9b..3171da051d0 100644 --- a/tests/integration/test_storage_s3/configs/s3_retry.xml +++ b/tests/integration/test_storage_s3/configs/s3_retry.xml @@ -1,6 +1,7 @@ + 1 10 5 diff --git a/tests/integration/test_storage_s3/s3_mocks/unstable_server.py b/tests/integration/test_storage_s3/s3_mocks/unstable_server.py index 103dd30340c..5ef781bdc9e 100644 --- a/tests/integration/test_storage_s3/s3_mocks/unstable_server.py +++ b/tests/integration/test_storage_s3/s3_mocks/unstable_server.py @@ -4,6 +4,7 @@ import re import socket import struct import sys +import time def gen_n_digit_number(n): @@ -39,14 +40,14 @@ random.seed("Unstable server/1.0") # Generating some "random" data and append a line which contains sum of numbers in column 4. lines = ( - b"".join((gen_line() for _ in range(500000))) + b"".join([gen_line() for _ in range(500000)]) + f"0,0,0,{-sum_in_4_column}\n".encode() ) class RequestHandler(http.server.BaseHTTPRequestHandler): def do_HEAD(self): - if self.path == "/root/test.csv": + if self.path == "/root/test.csv" or self.path == "/root/slow_send_test.csv": self.from_bytes = 0 self.end_bytes = len(lines) self.size = self.end_bytes @@ -101,6 +102,18 @@ class RequestHandler(http.server.BaseHTTPRequestHandler): print("Dropping connection") break + if self.path == "/root/slow_send_test.csv": + self.send_block_size = 81920 + + for c, i in enumerate( + range(self.from_bytes, self.end_bytes, self.send_block_size) + ): + self.wfile.write( + lines[i : min(i + self.send_block_size, self.end_bytes)] + ) + self.wfile.flush() + time.sleep(1) + elif self.path == "/": self.wfile.write(b"OK") diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 01ade1acc4d..8c79ad02445 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -818,6 +818,15 @@ def test_storage_s3_get_unstable(started_cluster): assert result.splitlines() == ["500001,500000,0"] +def test_storage_s3_get_slow(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64" + get_query = f"SELECT count(), sum(column3), sum(column4) FROM s3('http://resolver:8081/{started_cluster.minio_bucket}/slow_send_test.csv', 'CSV', '{table_format}') FORMAT CSV" + result = run_query(instance, get_query) + assert result.splitlines() == ["500001,500000,0"] + + def test_storage_s3_put_uncompressed(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] From 64c2a696666d594783c1996c0910166cacba000f Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 14 Nov 2023 20:28:37 +0100 Subject: [PATCH 13/19] check performance --- base/poco/Net/src/HTTPSession.cpp | 32 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index 9ebbd7d04cd..97decded282 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -94,22 +94,24 @@ void HTTPSession::setTimeout(const Poco::Timespan& timeout) void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco::Timespan& sendTimeout, const Poco::Timespan& receiveTimeout) { _connectionTimeout = connectionTimeout; + _sendTimeout = sendTimeout; + _receiveTimeout = receiveTimeout; - if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) - { - _sendTimeout = sendTimeout; - - if (connected()) - _socket.setSendTimeout(_sendTimeout); - } - - if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) - { - _receiveTimeout = receiveTimeout; - - if (connected()) - _socket.setReceiveTimeout(_receiveTimeout); - } +// if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) +// { +// _sendTimeout = sendTimeout; +// +// if (connected()) +// _socket.setSendTimeout(_sendTimeout); +// } +// +// if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) +// { +// _receiveTimeout = receiveTimeout; +// +// if (connected()) +// _socket.setReceiveTimeout(_receiveTimeout); +// } } From 6e3e6383ba0ad5b317c8189ac5a654ee5bb9057b Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 15 Nov 2023 19:00:27 +0100 Subject: [PATCH 14/19] perf check 2 --- base/poco/Net/src/HTTPServerSession.cpp | 1 - base/poco/Net/src/HTTPSession.cpp | 32 +++++++++----------- src/Disks/ObjectStorages/S3/diskSettings.cpp | 2 +- src/IO/S3/PocoHTTPClient.cpp | 1 - 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/base/poco/Net/src/HTTPServerSession.cpp b/base/poco/Net/src/HTTPServerSession.cpp index f6d3c4e5b92..d4f2b24879e 100644 --- a/base/poco/Net/src/HTTPServerSession.cpp +++ b/base/poco/Net/src/HTTPServerSession.cpp @@ -26,7 +26,6 @@ HTTPServerSession::HTTPServerSession(const StreamSocket& socket, HTTPServerParam _maxKeepAliveRequests(pParams->getMaxKeepAliveRequests()) { setTimeout(pParams->getTimeout()); - this->socket().setReceiveTimeout(pParams->getTimeout()); } diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index 97decded282..9ebbd7d04cd 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -94,24 +94,22 @@ void HTTPSession::setTimeout(const Poco::Timespan& timeout) void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco::Timespan& sendTimeout, const Poco::Timespan& receiveTimeout) { _connectionTimeout = connectionTimeout; - _sendTimeout = sendTimeout; - _receiveTimeout = receiveTimeout; -// if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) -// { -// _sendTimeout = sendTimeout; -// -// if (connected()) -// _socket.setSendTimeout(_sendTimeout); -// } -// -// if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) -// { -// _receiveTimeout = receiveTimeout; -// -// if (connected()) -// _socket.setReceiveTimeout(_receiveTimeout); -// } + if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) + { + _sendTimeout = sendTimeout; + + if (connected()) + _socket.setSendTimeout(_sendTimeout); + } + + if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) + { + _receiveTimeout = receiveTimeout; + + if (connected()) + _socket.setReceiveTimeout(_receiveTimeout); + } } diff --git a/src/Disks/ObjectStorages/S3/diskSettings.cpp b/src/Disks/ObjectStorages/S3/diskSettings.cpp index b0384daab2d..0232a6eb070 100644 --- a/src/Disks/ObjectStorages/S3/diskSettings.cpp +++ b/src/Disks/ObjectStorages/S3/diskSettings.cpp @@ -67,7 +67,7 @@ std::unique_ptr getClient( config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000); client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000); client_configuration.wait_on_pool_size_limit = false; - client_configuration.s3_use_adaptive_timeouts = config.getUInt( + client_configuration.s3_use_adaptive_timeouts = config.getBool( config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts); /* diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 904e2324145..f681362e607 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -14,7 +14,6 @@ #include #include #include -#include #include #include #include From 61948e31714f1aa3fa8143a569a72403c5c70408 Mon Sep 17 00:00:00 2001 From: Sema Checherinda <104093494+CheSema@users.noreply.github.com> Date: Thu, 16 Nov 2023 11:12:45 +0100 Subject: [PATCH 15/19] Update src/Core/Settings.h Co-authored-by: Nikita Taranov --- src/Core/Settings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 34547aded9c..76016bc70fb 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -94,7 +94,7 @@ class IColumn; M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \ M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \ M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ - M(Bool, s3_use_adaptive_timeouts, true, "When aggressive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \ + M(Bool, s3_use_adaptive_timeouts, true, "When adaptive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \ M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \ From ebd42187ad12ce2be24833820f59bb3d81def382 Mon Sep 17 00:00:00 2001 From: Sema Checherinda <104093494+CheSema@users.noreply.github.com> Date: Thu, 16 Nov 2023 12:29:15 +0100 Subject: [PATCH 16/19] Update tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml Co-authored-by: Nikita Taranov --- .../configs/config.d/storage_conf.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml index f51b854de75..98c6f551be6 100644 --- a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml +++ b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml @@ -34,7 +34,7 @@ true 1 - 0 + 0 1 20000 From 7d37c0e07073b2a1909e80c4aea45fdc4a35be75 Mon Sep 17 00:00:00 2001 From: Sema Checherinda <104093494+CheSema@users.noreply.github.com> Date: Thu, 16 Nov 2023 12:29:21 +0100 Subject: [PATCH 17/19] Update tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml Co-authored-by: Nikita Taranov --- .../configs/config.d/storage_conf.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml index 98c6f551be6..6303e9273fc 100644 --- a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml +++ b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml @@ -11,7 +11,7 @@ true 0 - 0 + 0 20000 From 4a1e207e7a5b02da2b2f6ea46edecd9fe6a9185c Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Thu, 16 Nov 2023 12:31:00 +0100 Subject: [PATCH 18/19] review notes --- base/poco/Net/src/HTTPSession.cpp | 34 +++++++++++++++++++++---------- src/IO/S3/PocoHTTPClient.cpp | 20 ++++++++++++++---- src/IO/S3/PocoHTTPClient.h | 6 +++--- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index 9ebbd7d04cd..d303a4c654b 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -93,22 +93,34 @@ void HTTPSession::setTimeout(const Poco::Timespan& timeout) void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco::Timespan& sendTimeout, const Poco::Timespan& receiveTimeout) { - _connectionTimeout = connectionTimeout; - - if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) + try { - _sendTimeout = sendTimeout; + _connectionTimeout = connectionTimeout; - if (connected()) - _socket.setSendTimeout(_sendTimeout); + if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) { + _sendTimeout = sendTimeout; + + if (connected()) + _socket.setSendTimeout(_sendTimeout); + } + + if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) { + _receiveTimeout = receiveTimeout; + + if (connected()) + _socket.setReceiveTimeout(_receiveTimeout); + } } - - if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) + catch (NetException &) { - _receiveTimeout = receiveTimeout; - if (connected()) - _socket.setReceiveTimeout(_receiveTimeout); +#ifndef NDEBUG + // mute exceptions in release + // just in case when changing settings on socket is not allowed + // however it should be OK for timeouts +#else + throw; +#endif } } diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index f681362e607..4a1b6def133 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -407,17 +407,29 @@ void PocoHTTPClient::makeRequestInternalImpl( /// This can lead to request signature difference on S3 side. if constexpr (pooled) session = makePooledHTTPSession( - target_uri, getTimeouts(method, first_attempt), http_connection_pool_size, wait_on_pool_size_limit, proxy_configuration); + target_uri, + getTimeouts(method, first_attempt, /*first_byte*/ true), + http_connection_pool_size, + wait_on_pool_size_limit, + proxy_configuration); else - session = makeHTTPSession(target_uri, getTimeouts(method, first_attempt), proxy_configuration); + session = makeHTTPSession( + target_uri, + getTimeouts(method, first_attempt, /*first_byte*/ true), + proxy_configuration); } else { if constexpr (pooled) session = makePooledHTTPSession( - target_uri, getTimeouts(method, first_attempt), http_connection_pool_size, wait_on_pool_size_limit); + target_uri, + getTimeouts(method, first_attempt, /*first_byte*/ true), + http_connection_pool_size, + wait_on_pool_size_limit); else - session = makeHTTPSession(target_uri, getTimeouts(method, first_attempt)); + session = makeHTTPSession( + target_uri, + getTimeouts(method, first_attempt, /*first_byte*/ true)); } /// In case of error this address will be written to logs diff --git a/src/IO/S3/PocoHTTPClient.h b/src/IO/S3/PocoHTTPClient.h index 14c4fec5dd7..5178d75e7b6 100644 --- a/src/IO/S3/PocoHTTPClient.h +++ b/src/IO/S3/PocoHTTPClient.h @@ -55,7 +55,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration size_t http_connection_pool_size = 0; /// See PoolBase::BehaviourOnLimit bool wait_on_pool_size_limit = true; - bool s3_use_adaptive_timeouts = false; + bool s3_use_adaptive_timeouts = true; std::function error_report; @@ -171,7 +171,7 @@ private: Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const; - ConnectionTimeouts getTimeouts(const String & method, bool first_attempt, bool first_byte = true) const; + ConnectionTimeouts getTimeouts(const String & method, bool first_attempt, bool first_byte) const; protected: static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request); @@ -182,7 +182,7 @@ protected: ConnectionTimeouts timeouts; const RemoteHostFilter & remote_host_filter; unsigned int s3_max_redirects; - bool s3_use_adaptive_timeouts = false; + bool s3_use_adaptive_timeouts = true; bool enable_s3_requests_logging; bool for_disk_s3; From 74a8f3191dc96f3ac46187b556b5d127b7ae6030 Mon Sep 17 00:00:00 2001 From: Sema Checherinda <104093494+CheSema@users.noreply.github.com> Date: Fri, 17 Nov 2023 11:38:56 +0100 Subject: [PATCH 19/19] Update HTTPSession.cpp --- base/poco/Net/src/HTTPSession.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/base/poco/Net/src/HTTPSession.cpp b/base/poco/Net/src/HTTPSession.cpp index d303a4c654b..8f951b3102c 100644 --- a/base/poco/Net/src/HTTPSession.cpp +++ b/base/poco/Net/src/HTTPSession.cpp @@ -113,13 +113,12 @@ void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco } catch (NetException &) { - #ifndef NDEBUG + throw; +#else // mute exceptions in release // just in case when changing settings on socket is not allowed // however it should be OK for timeouts -#else - throw; #endif } }