diff --git a/src/Disks/S3/ProxyConfiguration.h b/src/Disks/S3/ProxyConfiguration.h index 888f4c6faf9..793170e727c 100644 --- a/src/Disks/S3/ProxyConfiguration.h +++ b/src/Disks/S3/ProxyConfiguration.h @@ -19,6 +19,7 @@ public: virtual ~ProxyConfiguration() = default; /// Returns proxy configuration on each HTTP request. virtual Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) = 0; + virtual void errorReport(const Aws::Client::ClientConfigurationPerRequest & config) = 0; }; } diff --git a/src/Disks/S3/ProxyListConfiguration.h b/src/Disks/S3/ProxyListConfiguration.h index 4d37d2b6d69..bd5bbba19a4 100644 --- a/src/Disks/S3/ProxyListConfiguration.h +++ b/src/Disks/S3/ProxyListConfiguration.h @@ -20,6 +20,7 @@ class ProxyListConfiguration : public ProxyConfiguration public: explicit ProxyListConfiguration(std::vector proxies_); Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override; + void errorReport(const Aws::Client::ClientConfigurationPerRequest &) override {} private: /// List of configured proxies. diff --git a/src/Disks/S3/ProxyResolverConfiguration.cpp b/src/Disks/S3/ProxyResolverConfiguration.cpp index b959d8b4415..17dd19fe444 100644 --- a/src/Disks/S3/ProxyResolverConfiguration.cpp +++ b/src/Disks/S3/ProxyResolverConfiguration.cpp @@ -16,8 +16,10 @@ namespace DB::ErrorCodes namespace DB::S3 { -ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_) - : endpoint(endpoint_), proxy_scheme(std::move(proxy_scheme_)), proxy_port(proxy_port_) + +ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_ + , unsigned proxy_port_, unsigned cache_ttl_) + : endpoint(endpoint_), proxy_scheme(std::move(proxy_scheme_)), proxy_port(proxy_port_), cache_ttl(cache_ttl_) { } @@ -25,16 +27,25 @@ Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfig { LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Obtain proxy using resolver: {}", endpoint.toString()); + std::unique_lock lock(cache_mutex); + + std::chrono::time_point now = std::chrono::system_clock::now(); + + if (cache_ttl.count() && cache_valid && now <= cache_timestamp + cache_ttl && now >= cache_timestamp) + { + LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use cached proxy: {}://{}:{}", Aws::Http::SchemeMapper::ToString(cached_config.proxyScheme), cached_config.proxyHost, cached_config.proxyPort); + return cached_config; + } + /// 1 second is enough for now. /// TODO: Make timeouts configurable. ConnectionTimeouts timeouts( Poco::Timespan(1000000), /// Connection timeout. Poco::Timespan(1000000), /// Send timeout. - Poco::Timespan(1000000) /// Receive timeout. + Poco::Timespan(1000000) /// Receive timeout. ); auto session = makeHTTPSession(endpoint, timeouts); - Aws::Client::ClientConfigurationPerRequest cfg; try { /// It should be just empty GET request. @@ -53,20 +64,41 @@ Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfig LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use proxy: {}://{}:{}", proxy_scheme, proxy_host, proxy_port); - cfg.proxyScheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str()); - cfg.proxyHost = proxy_host; - cfg.proxyPort = proxy_port; + cached_config.proxyScheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str()); + cached_config.proxyHost = proxy_host; + cached_config.proxyPort = proxy_port; + cache_timestamp = std::chrono::system_clock::now(); + cache_valid = true; - return cfg; + return cached_config; } catch (...) { tryLogCurrentException("AWSClient", "Failed to obtain proxy"); /// Don't use proxy if it can't be obtained. + Aws::Client::ClientConfigurationPerRequest cfg; return cfg; } } +void ProxyResolverConfiguration::errorReport(const Aws::Client::ClientConfigurationPerRequest & config) +{ + if (config.proxyHost.empty()) + return; + + std::unique_lock lock(cache_mutex); + + if (!cache_ttl.count() || !cache_valid) + return; + + if (cached_config.proxyScheme != config.proxyScheme || cached_config.proxyHost != config.proxyHost + || cached_config.proxyPort != config.proxyPort) + return; + + /// Invalidate cached proxy when got error with this proxy + cache_valid = false; +} + } #endif diff --git a/src/Disks/S3/ProxyResolverConfiguration.h b/src/Disks/S3/ProxyResolverConfiguration.h index 8eea662f257..f7eba8d028a 100644 --- a/src/Disks/S3/ProxyResolverConfiguration.h +++ b/src/Disks/S3/ProxyResolverConfiguration.h @@ -8,6 +8,8 @@ #include "ProxyConfiguration.h" +#include + namespace DB::S3 { /** @@ -18,8 +20,9 @@ namespace DB::S3 class ProxyResolverConfiguration : public ProxyConfiguration { public: - ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_); + ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_, unsigned cache_ttl_); Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override; + void errorReport(const Aws::Client::ClientConfigurationPerRequest & config) override; private: /// Endpoint to obtain a proxy host. @@ -28,6 +31,12 @@ private: const String proxy_scheme; /// Port for obtained proxy. const unsigned proxy_port; + + std::mutex cache_mutex; + bool cache_valid = false; + std::chrono::time_point cache_timestamp; + const std::chrono::seconds cache_ttl{0}; + Aws::Client::ClientConfigurationPerRequest cached_config; }; } diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 49a11b1dbb9..01b2cea2045 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -56,11 +56,12 @@ std::shared_ptr getProxyResolverConfiguration( if (proxy_scheme != "http" && proxy_scheme != "https") throw Exception("Only HTTP/HTTPS schemas allowed in proxy resolver config: " + proxy_scheme, ErrorCodes::BAD_ARGUMENTS); auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port"); + auto cache_ttl = proxy_resolver_config.getUInt(prefix + ".proxy_cache_time", 10); LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy resolver: {}, Scheme: {}, Port: {}", endpoint.toString(), proxy_scheme, proxy_port); - return std::make_shared(endpoint, proxy_scheme, proxy_port); + return std::make_shared(endpoint, proxy_scheme, proxy_port, cache_ttl); } std::shared_ptr getProxyListConfiguration( @@ -128,8 +129,12 @@ getClient(const Poco::Util::AbstractConfiguration & config, const String & confi auto proxy_config = getProxyConfiguration(config_prefix, config); if (proxy_config) + { client_configuration.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); }; + client_configuration.error_report + = [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); }; + } client_configuration.retryStrategy = std::make_shared(config.getUInt(config_prefix + ".retry_attempts", 10)); diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 618d9ab7661..78cb5300101 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -89,6 +89,7 @@ void PocoHTTPClientConfiguration::updateSchemeAndRegion() PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfiguration) : per_request_configuration(clientConfiguration.perRequestConfiguration) + , error_report(clientConfiguration.error_report) , timeouts(ConnectionTimeouts( Poco::Timespan(clientConfiguration.connectTimeoutMs * 1000), /// connection timeout. Poco::Timespan(clientConfiguration.requestTimeoutMs * 1000), /// send timeout. @@ -296,6 +297,8 @@ void PocoHTTPClient::makeRequestInternal( else if (status_code >= 300) { ProfileEvents::increment(select_metric(S3MetricType::Errors)); + if (status_code >= 500 && error_report) + error_report(request_configuration); } response->SetResponseBody(response_body_stream, session); diff --git a/src/IO/S3/PocoHTTPClient.h b/src/IO/S3/PocoHTTPClient.h index e374863cf00..12f5af60ed4 100644 --- a/src/IO/S3/PocoHTTPClient.h +++ b/src/IO/S3/PocoHTTPClient.h @@ -37,6 +37,8 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration void updateSchemeAndRegion(); + std::function error_report; + private: PocoHTTPClientConfiguration(const String & force_region_, const RemoteHostFilter & remote_host_filter_, unsigned int s3_max_redirects_); @@ -95,6 +97,7 @@ private: Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const; std::function per_request_configuration; + std::function error_report; ConnectionTimeouts timeouts; const RemoteHostFilter & remote_host_filter; unsigned int s3_max_redirects; diff --git a/tests/integration/test_s3_with_proxy/configs/config.d/storage_conf.xml b/tests/integration/test_s3_with_proxy/configs/config.d/storage_conf.xml index ccae67c7c09..a8d36a53bd5 100644 --- a/tests/integration/test_s3_with_proxy/configs/config.d/storage_conf.xml +++ b/tests/integration/test_s3_with_proxy/configs/config.d/storage_conf.xml @@ -26,6 +26,7 @@ http://resolver:8080/hostname http 80 + 10