Add 's3_max_redirects' setting

This commit is contained in:
Anton Ivashkin 2020-11-13 19:31:51 +03:00
parent bdbd41bd3d
commit 026f7e0a27
7 changed files with 37 additions and 15 deletions

View File

@ -65,6 +65,7 @@ class IColumn;
M(UInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \ M(UInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \ M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \ M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \ M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \ M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \ M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \

View File

@ -132,7 +132,8 @@ void registerDiskS3(DiskFactory & factory)
uri.is_virtual_hosted_style, uri.is_virtual_hosted_style,
config.getString(config_prefix + ".access_key_id", ""), config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""), config.getString(config_prefix + ".secret_access_key", ""),
context.getRemoteHostFilter()); context.getRemoteHostFilter(),
context.getGlobalContext());
String metadata_path = config.getString(config_prefix + ".metadata_path", context.getPath() + "disks/" + name + "/"); String metadata_path = config.getString(config_prefix + ".metadata_path", context.getPath() + "disks/" + name + "/");

View File

@ -8,6 +8,7 @@
#include <IO/HTTPCommon.h> #include <IO/HTTPCommon.h>
#include <IO/S3/PocoHTTPResponseStream.h> #include <IO/S3/PocoHTTPResponseStream.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Interpreters/Context.h>
#include <aws/core/http/HttpRequest.h> #include <aws/core/http/HttpRequest.h>
#include <aws/core/http/HttpResponse.h> #include <aws/core/http/HttpResponse.h>
#include <aws/core/http/standard/StandardHttpResponse.h> #include <aws/core/http/standard/StandardHttpResponse.h>
@ -48,9 +49,11 @@ namespace DB::S3
PocoHTTPClientConfiguration::PocoHTTPClientConfiguration( PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
const Aws::Client::ClientConfiguration & cfg, const Aws::Client::ClientConfiguration & cfg,
const RemoteHostFilter & remote_host_filter_) const RemoteHostFilter & remote_host_filter_,
const Context & global_context_)
: Aws::Client::ClientConfiguration(cfg) : Aws::Client::ClientConfiguration(cfg)
, remote_host_filter(remote_host_filter_) , remote_host_filter(remote_host_filter_)
, global_context(global_context_)
{ {
} }
@ -81,6 +84,7 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfigu
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout. Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout.
)) ))
, remote_host_filter(clientConfiguration.remote_host_filter) , remote_host_filter(clientConfiguration.remote_host_filter)
, global_context(clientConfiguration.global_context)
{ {
} }
@ -155,10 +159,10 @@ void PocoHTTPClient::makeRequestInternal(
ProfileEvents::increment(select_metric(S3MetricType::Count)); ProfileEvents::increment(select_metric(S3MetricType::Count));
static constexpr int max_redirect_attempts = 10; unsigned int max_redirect_attempts = global_context.getSettingsRef().s3_max_redirects;
try try
{ {
for (int attempt = 0; attempt < max_redirect_attempts; ++attempt) for (unsigned int attempt = 0; attempt < max_redirect_attempts; ++attempt)
{ {
Poco::URI poco_uri(uri); Poco::URI poco_uri(uri);

View File

@ -11,14 +11,21 @@ namespace Aws::Http::Standard
class StandardHttpResponse; class StandardHttpResponse;
} }
namespace DB
{
class Context;
}
namespace DB::S3 namespace DB::S3
{ {
struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
{ {
const RemoteHostFilter & remote_host_filter; const RemoteHostFilter & remote_host_filter;
const Context & global_context;
PocoHTTPClientConfiguration(const Aws::Client::ClientConfiguration & cfg, const RemoteHostFilter & remote_host_filter_); PocoHTTPClientConfiguration(const Aws::Client::ClientConfiguration & cfg, const RemoteHostFilter & remote_host_filter_,
const Context & global_context_);
void updateSchemeAndRegion(); void updateSchemeAndRegion();
}; };
@ -48,6 +55,7 @@ private:
std::function<Aws::Client::ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration; std::function<Aws::Client::ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration;
ConnectionTimeouts timeouts; ConnectionTimeouts timeouts;
const RemoteHostFilter & remote_host_filter; const RemoteHostFilter & remote_host_filter;
const Context & global_context;
}; };
} }

View File

@ -164,14 +164,15 @@ namespace S3
bool is_virtual_hosted_style, bool is_virtual_hosted_style,
const String & access_key_id, const String & access_key_id,
const String & secret_access_key, const String & secret_access_key,
const RemoteHostFilter & remote_host_filter) const RemoteHostFilter & remote_host_filter,
const Context & global_context)
{ {
Aws::Client::ClientConfiguration cfg; Aws::Client::ClientConfiguration cfg;
if (!endpoint.empty()) if (!endpoint.empty())
cfg.endpointOverride = endpoint; cfg.endpointOverride = endpoint;
return create(cfg, is_virtual_hosted_style, access_key_id, secret_access_key, remote_host_filter); return create(cfg, is_virtual_hosted_style, access_key_id, secret_access_key, remote_host_filter, global_context);
} }
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT
@ -179,11 +180,12 @@ namespace S3
bool is_virtual_hosted_style, bool is_virtual_hosted_style,
const String & access_key_id, const String & access_key_id,
const String & secret_access_key, const String & secret_access_key,
const RemoteHostFilter & remote_host_filter) const RemoteHostFilter & remote_host_filter,
const Context & global_context)
{ {
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key); Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
PocoHTTPClientConfiguration client_configuration(cfg, remote_host_filter); PocoHTTPClientConfiguration client_configuration(cfg, remote_host_filter, global_context);
client_configuration.updateSchemeAndRegion(); client_configuration.updateSchemeAndRegion();
@ -201,9 +203,10 @@ namespace S3
const String & access_key_id, const String & access_key_id,
const String & secret_access_key, const String & secret_access_key,
HeaderCollection headers, HeaderCollection headers,
const RemoteHostFilter & remote_host_filter) const RemoteHostFilter & remote_host_filter,
const Context & global_context)
{ {
PocoHTTPClientConfiguration client_configuration({}, remote_host_filter); PocoHTTPClientConfiguration client_configuration({}, remote_host_filter, global_context);
if (!endpoint.empty()) if (!endpoint.empty())
client_configuration.endpointOverride = endpoint; client_configuration.endpointOverride = endpoint;

View File

@ -19,6 +19,7 @@ namespace DB
class RemoteHostFilter; class RemoteHostFilter;
struct HttpHeader; struct HttpHeader;
using HeaderCollection = std::vector<HttpHeader>; using HeaderCollection = std::vector<HttpHeader>;
class Context;
} }
namespace DB::S3 namespace DB::S3
@ -36,14 +37,16 @@ public:
bool is_virtual_hosted_style, bool is_virtual_hosted_style,
const String & access_key_id, const String & access_key_id,
const String & secret_access_key, const String & secret_access_key,
const RemoteHostFilter & remote_host_filter); const RemoteHostFilter & remote_host_filter,
const Context & global_context);
std::shared_ptr<Aws::S3::S3Client> create( std::shared_ptr<Aws::S3::S3Client> create(
Aws::Client::ClientConfiguration & cfg, Aws::Client::ClientConfiguration & cfg,
bool is_virtual_hosted_style, bool is_virtual_hosted_style,
const String & access_key_id, const String & access_key_id,
const String & secret_access_key, const String & secret_access_key,
const RemoteHostFilter & remote_host_filter); const RemoteHostFilter & remote_host_filter,
const Context & global_context);
std::shared_ptr<Aws::S3::S3Client> create( std::shared_ptr<Aws::S3::S3Client> create(
const String & endpoint, const String & endpoint,
@ -51,7 +54,8 @@ public:
const String & access_key_id, const String & access_key_id,
const String & secret_access_key, const String & secret_access_key,
HeaderCollection headers, HeaderCollection headers,
const RemoteHostFilter & remote_host_filter); const RemoteHostFilter & remote_host_filter,
const Context & global_context);
private: private:
ClientFactory(); ClientFactory();

View File

@ -218,7 +218,8 @@ StorageS3::StorageS3(
credentials = Aws::Auth::AWSCredentials(std::move(settings.access_key_id), std::move(settings.secret_access_key)); credentials = Aws::Auth::AWSCredentials(std::move(settings.access_key_id), std::move(settings.secret_access_key));
client = S3::ClientFactory::instance().create( client = S3::ClientFactory::instance().create(
uri_.endpoint, uri_.is_virtual_hosted_style, access_key_id_, secret_access_key_, std::move(settings.headers), context_.getRemoteHostFilter()); uri_.endpoint, uri_.is_virtual_hosted_style, access_key_id_, secret_access_key_, std::move(settings.headers),
context_.getRemoteHostFilter(), context_.getGlobalContext());
} }