Merge pull request #27216 from ianton-ru/MDB-13990

This commit is contained in:
Vladimir C 2021-08-06 18:12:00 +03:00 committed by GitHub
commit 9de16c5017
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 65 additions and 10 deletions

View File

@ -19,6 +19,7 @@ public:
virtual ~ProxyConfiguration() = default;
/// Returns proxy configuration on each HTTP request.
virtual Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) = 0;
virtual void errorReport(const Aws::Client::ClientConfigurationPerRequest & config) = 0;
};
}

View File

@ -20,6 +20,7 @@ class ProxyListConfiguration : public ProxyConfiguration
public:
explicit ProxyListConfiguration(std::vector<Poco::URI> proxies_);
Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override;
void errorReport(const Aws::Client::ClientConfigurationPerRequest &) override {}
private:
/// List of configured proxies.

View File

@ -16,8 +16,10 @@ namespace DB::ErrorCodes
namespace DB::S3
{
ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_)
: endpoint(endpoint_), proxy_scheme(std::move(proxy_scheme_)), proxy_port(proxy_port_)
ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_
, unsigned proxy_port_, unsigned cache_ttl_)
: endpoint(endpoint_), proxy_scheme(std::move(proxy_scheme_)), proxy_port(proxy_port_), cache_ttl(cache_ttl_)
{
}
@ -25,16 +27,25 @@ Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfig
{
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Obtain proxy using resolver: {}", endpoint.toString());
std::unique_lock lock(cache_mutex);
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
if (cache_ttl.count() && cache_valid && now <= cache_timestamp + cache_ttl && now >= cache_timestamp)
{
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use cached proxy: {}://{}:{}", Aws::Http::SchemeMapper::ToString(cached_config.proxyScheme), cached_config.proxyHost, cached_config.proxyPort);
return cached_config;
}
/// 1 second is enough for now.
/// TODO: Make timeouts configurable.
ConnectionTimeouts timeouts(
Poco::Timespan(1000000), /// Connection timeout.
Poco::Timespan(1000000), /// Send timeout.
Poco::Timespan(1000000) /// Receive timeout.
Poco::Timespan(1000000) /// Receive timeout.
);
auto session = makeHTTPSession(endpoint, timeouts);
Aws::Client::ClientConfigurationPerRequest cfg;
try
{
/// It should be just empty GET request.
@ -53,20 +64,41 @@ Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfig
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use proxy: {}://{}:{}", proxy_scheme, proxy_host, proxy_port);
cfg.proxyScheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str());
cfg.proxyHost = proxy_host;
cfg.proxyPort = proxy_port;
cached_config.proxyScheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str());
cached_config.proxyHost = proxy_host;
cached_config.proxyPort = proxy_port;
cache_timestamp = std::chrono::system_clock::now();
cache_valid = true;
return cfg;
return cached_config;
}
catch (...)
{
tryLogCurrentException("AWSClient", "Failed to obtain proxy");
/// Don't use proxy if it can't be obtained.
Aws::Client::ClientConfigurationPerRequest cfg;
return cfg;
}
}
void ProxyResolverConfiguration::errorReport(const Aws::Client::ClientConfigurationPerRequest & config)
{
if (config.proxyHost.empty())
return;
std::unique_lock lock(cache_mutex);
if (!cache_ttl.count() || !cache_valid)
return;
if (cached_config.proxyScheme != config.proxyScheme || cached_config.proxyHost != config.proxyHost
|| cached_config.proxyPort != config.proxyPort)
return;
/// Invalidate cached proxy when got error with this proxy
cache_valid = false;
}
}
#endif

View File

@ -8,6 +8,8 @@
#include "ProxyConfiguration.h"
#include <mutex>
namespace DB::S3
{
/**
@ -18,8 +20,9 @@ namespace DB::S3
class ProxyResolverConfiguration : public ProxyConfiguration
{
public:
ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_);
ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_, unsigned cache_ttl_);
Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override;
void errorReport(const Aws::Client::ClientConfigurationPerRequest & config) override;
private:
/// Endpoint to obtain a proxy host.
@ -28,6 +31,12 @@ private:
const String proxy_scheme;
/// Port for obtained proxy.
const unsigned proxy_port;
std::mutex cache_mutex;
bool cache_valid = false;
std::chrono::time_point<std::chrono::system_clock> cache_timestamp;
const std::chrono::seconds cache_ttl{0};
Aws::Client::ClientConfigurationPerRequest cached_config;
};
}

View File

@ -56,11 +56,12 @@ std::shared_ptr<S3::ProxyResolverConfiguration> getProxyResolverConfiguration(
if (proxy_scheme != "http" && proxy_scheme != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy resolver config: " + proxy_scheme, ErrorCodes::BAD_ARGUMENTS);
auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port");
auto cache_ttl = proxy_resolver_config.getUInt(prefix + ".proxy_cache_time", 10);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy resolver: {}, Scheme: {}, Port: {}",
endpoint.toString(), proxy_scheme, proxy_port);
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port);
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port, cache_ttl);
}
std::shared_ptr<S3::ProxyListConfiguration> getProxyListConfiguration(
@ -128,8 +129,12 @@ getClient(const Poco::Util::AbstractConfiguration & config, const String & confi
auto proxy_config = getProxyConfiguration(config_prefix, config);
if (proxy_config)
{
client_configuration.perRequestConfiguration
= [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
client_configuration.error_report
= [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); };
}
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));

View File

@ -89,6 +89,7 @@ void PocoHTTPClientConfiguration::updateSchemeAndRegion()
PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfiguration)
: per_request_configuration(clientConfiguration.perRequestConfiguration)
, error_report(clientConfiguration.error_report)
, timeouts(ConnectionTimeouts(
Poco::Timespan(clientConfiguration.connectTimeoutMs * 1000), /// connection timeout.
Poco::Timespan(clientConfiguration.requestTimeoutMs * 1000), /// send timeout.
@ -296,6 +297,8 @@ void PocoHTTPClient::makeRequestInternal(
else if (status_code >= 300)
{
ProfileEvents::increment(select_metric(S3MetricType::Errors));
if (status_code >= 500 && error_report)
error_report(request_configuration);
}
response->SetResponseBody(response_body_stream, session);

View File

@ -37,6 +37,8 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
void updateSchemeAndRegion();
std::function<void(const Aws::Client::ClientConfigurationPerRequest &)> error_report;
private:
PocoHTTPClientConfiguration(const String & force_region_, const RemoteHostFilter & remote_host_filter_, unsigned int s3_max_redirects_);
@ -95,6 +97,7 @@ private:
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
std::function<Aws::Client::ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration;
std::function<void(const Aws::Client::ClientConfigurationPerRequest &)> error_report;
ConnectionTimeouts timeouts;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;

View File

@ -26,6 +26,7 @@
<endpoint>http://resolver:8080/hostname</endpoint>
<proxy_scheme>http</proxy_scheme>
<proxy_port>80</proxy_port>
<proxy_cache_time>10</proxy_cache_time>
</resolver>
</proxy>
</s3_with_resolver>