mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #17220 from ianton-ru/s3_max_redirects
S3 max redirects
This commit is contained in:
commit
45f09fe293
@ -65,6 +65,7 @@ class IColumn;
|
|||||||
M(UInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \
|
M(UInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \
|
||||||
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
|
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
|
||||||
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
|
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
|
||||||
|
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
|
||||||
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
|
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
|
||||||
M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
|
M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
|
||||||
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
|
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
|
||||||
|
@ -132,7 +132,8 @@ void registerDiskS3(DiskFactory & factory)
|
|||||||
uri.is_virtual_hosted_style,
|
uri.is_virtual_hosted_style,
|
||||||
config.getString(config_prefix + ".access_key_id", ""),
|
config.getString(config_prefix + ".access_key_id", ""),
|
||||||
config.getString(config_prefix + ".secret_access_key", ""),
|
config.getString(config_prefix + ".secret_access_key", ""),
|
||||||
context.getRemoteHostFilter());
|
context.getRemoteHostFilter(),
|
||||||
|
context.getGlobalContext().getSettingsRef().s3_max_redirects);
|
||||||
|
|
||||||
String metadata_path = config.getString(config_prefix + ".metadata_path", context.getPath() + "disks/" + name + "/");
|
String metadata_path = config.getString(config_prefix + ".metadata_path", context.getPath() + "disks/" + name + "/");
|
||||||
|
|
||||||
|
@ -50,9 +50,11 @@ namespace DB::S3
|
|||||||
|
|
||||||
PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
|
PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
|
||||||
const Aws::Client::ClientConfiguration & cfg,
|
const Aws::Client::ClientConfiguration & cfg,
|
||||||
const RemoteHostFilter & remote_host_filter_)
|
const RemoteHostFilter & remote_host_filter_,
|
||||||
|
unsigned int s3_max_redirects_)
|
||||||
: Aws::Client::ClientConfiguration(cfg)
|
: Aws::Client::ClientConfiguration(cfg)
|
||||||
, remote_host_filter(remote_host_filter_)
|
, remote_host_filter(remote_host_filter_)
|
||||||
|
, s3_max_redirects(s3_max_redirects_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,6 +85,7 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfigu
|
|||||||
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout.
|
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout.
|
||||||
))
|
))
|
||||||
, remote_host_filter(clientConfiguration.remote_host_filter)
|
, remote_host_filter(clientConfiguration.remote_host_filter)
|
||||||
|
, s3_max_redirects(clientConfiguration.s3_max_redirects)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,10 +160,9 @@ void PocoHTTPClient::makeRequestInternal(
|
|||||||
|
|
||||||
ProfileEvents::increment(select_metric(S3MetricType::Count));
|
ProfileEvents::increment(select_metric(S3MetricType::Count));
|
||||||
|
|
||||||
static constexpr int max_redirect_attempts = 10;
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
for (int attempt = 0; attempt < max_redirect_attempts; ++attempt)
|
for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt)
|
||||||
{
|
{
|
||||||
Poco::URI poco_uri(uri);
|
Poco::URI poco_uri(uri);
|
||||||
|
|
||||||
|
@ -11,14 +11,21 @@ namespace Aws::Http::Standard
|
|||||||
class StandardHttpResponse;
|
class StandardHttpResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
class Context;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB::S3
|
namespace DB::S3
|
||||||
{
|
{
|
||||||
|
|
||||||
struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
||||||
{
|
{
|
||||||
const RemoteHostFilter & remote_host_filter;
|
const RemoteHostFilter & remote_host_filter;
|
||||||
|
unsigned int s3_max_redirects;
|
||||||
|
|
||||||
PocoHTTPClientConfiguration(const Aws::Client::ClientConfiguration & cfg, const RemoteHostFilter & remote_host_filter_);
|
PocoHTTPClientConfiguration(const Aws::Client::ClientConfiguration & cfg, const RemoteHostFilter & remote_host_filter_,
|
||||||
|
unsigned int s3_max_redirects_);
|
||||||
|
|
||||||
void updateSchemeAndRegion();
|
void updateSchemeAndRegion();
|
||||||
};
|
};
|
||||||
@ -48,6 +55,7 @@ private:
|
|||||||
std::function<Aws::Client::ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration;
|
std::function<Aws::Client::ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration;
|
||||||
ConnectionTimeouts timeouts;
|
ConnectionTimeouts timeouts;
|
||||||
const RemoteHostFilter & remote_host_filter;
|
const RemoteHostFilter & remote_host_filter;
|
||||||
|
unsigned int s3_max_redirects;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -164,14 +164,15 @@ namespace S3
|
|||||||
bool is_virtual_hosted_style,
|
bool is_virtual_hosted_style,
|
||||||
const String & access_key_id,
|
const String & access_key_id,
|
||||||
const String & secret_access_key,
|
const String & secret_access_key,
|
||||||
const RemoteHostFilter & remote_host_filter)
|
const RemoteHostFilter & remote_host_filter,
|
||||||
|
unsigned int s3_max_redirects)
|
||||||
{
|
{
|
||||||
Aws::Client::ClientConfiguration cfg;
|
Aws::Client::ClientConfiguration cfg;
|
||||||
|
|
||||||
if (!endpoint.empty())
|
if (!endpoint.empty())
|
||||||
cfg.endpointOverride = endpoint;
|
cfg.endpointOverride = endpoint;
|
||||||
|
|
||||||
return create(cfg, is_virtual_hosted_style, access_key_id, secret_access_key, remote_host_filter);
|
return create(cfg, is_virtual_hosted_style, access_key_id, secret_access_key, remote_host_filter, s3_max_redirects);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT
|
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT
|
||||||
@ -179,11 +180,12 @@ namespace S3
|
|||||||
bool is_virtual_hosted_style,
|
bool is_virtual_hosted_style,
|
||||||
const String & access_key_id,
|
const String & access_key_id,
|
||||||
const String & secret_access_key,
|
const String & secret_access_key,
|
||||||
const RemoteHostFilter & remote_host_filter)
|
const RemoteHostFilter & remote_host_filter,
|
||||||
|
unsigned int s3_max_redirects)
|
||||||
{
|
{
|
||||||
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
|
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
|
||||||
|
|
||||||
PocoHTTPClientConfiguration client_configuration(cfg, remote_host_filter);
|
PocoHTTPClientConfiguration client_configuration(cfg, remote_host_filter, s3_max_redirects);
|
||||||
|
|
||||||
client_configuration.updateSchemeAndRegion();
|
client_configuration.updateSchemeAndRegion();
|
||||||
|
|
||||||
@ -201,9 +203,10 @@ namespace S3
|
|||||||
const String & access_key_id,
|
const String & access_key_id,
|
||||||
const String & secret_access_key,
|
const String & secret_access_key,
|
||||||
HeaderCollection headers,
|
HeaderCollection headers,
|
||||||
const RemoteHostFilter & remote_host_filter)
|
const RemoteHostFilter & remote_host_filter,
|
||||||
|
unsigned int s3_max_redirects)
|
||||||
{
|
{
|
||||||
PocoHTTPClientConfiguration client_configuration({}, remote_host_filter);
|
PocoHTTPClientConfiguration client_configuration({}, remote_host_filter, s3_max_redirects);
|
||||||
|
|
||||||
if (!endpoint.empty())
|
if (!endpoint.empty())
|
||||||
client_configuration.endpointOverride = endpoint;
|
client_configuration.endpointOverride = endpoint;
|
||||||
|
@ -36,14 +36,16 @@ public:
|
|||||||
bool is_virtual_hosted_style,
|
bool is_virtual_hosted_style,
|
||||||
const String & access_key_id,
|
const String & access_key_id,
|
||||||
const String & secret_access_key,
|
const String & secret_access_key,
|
||||||
const RemoteHostFilter & remote_host_filter);
|
const RemoteHostFilter & remote_host_filter,
|
||||||
|
unsigned int s3_max_redirects);
|
||||||
|
|
||||||
std::shared_ptr<Aws::S3::S3Client> create(
|
std::shared_ptr<Aws::S3::S3Client> create(
|
||||||
Aws::Client::ClientConfiguration & cfg,
|
Aws::Client::ClientConfiguration & cfg,
|
||||||
bool is_virtual_hosted_style,
|
bool is_virtual_hosted_style,
|
||||||
const String & access_key_id,
|
const String & access_key_id,
|
||||||
const String & secret_access_key,
|
const String & secret_access_key,
|
||||||
const RemoteHostFilter & remote_host_filter);
|
const RemoteHostFilter & remote_host_filter,
|
||||||
|
unsigned int s3_max_redirects);
|
||||||
|
|
||||||
std::shared_ptr<Aws::S3::S3Client> create(
|
std::shared_ptr<Aws::S3::S3Client> create(
|
||||||
const String & endpoint,
|
const String & endpoint,
|
||||||
@ -51,7 +53,8 @@ public:
|
|||||||
const String & access_key_id,
|
const String & access_key_id,
|
||||||
const String & secret_access_key,
|
const String & secret_access_key,
|
||||||
HeaderCollection headers,
|
HeaderCollection headers,
|
||||||
const RemoteHostFilter & remote_host_filter);
|
const RemoteHostFilter & remote_host_filter,
|
||||||
|
unsigned int s3_max_redirects);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ClientFactory();
|
ClientFactory();
|
||||||
|
@ -216,7 +216,8 @@ StorageS3::StorageS3(
|
|||||||
credentials = Aws::Auth::AWSCredentials(std::move(settings.access_key_id), std::move(settings.secret_access_key));
|
credentials = Aws::Auth::AWSCredentials(std::move(settings.access_key_id), std::move(settings.secret_access_key));
|
||||||
|
|
||||||
client = S3::ClientFactory::instance().create(
|
client = S3::ClientFactory::instance().create(
|
||||||
uri_.endpoint, uri_.is_virtual_hosted_style, access_key_id_, secret_access_key_, std::move(settings.headers), context_.getRemoteHostFilter());
|
uri_.endpoint, uri_.is_virtual_hosted_style, access_key_id_, secret_access_key_, std::move(settings.headers),
|
||||||
|
context_.getRemoteHostFilter(), context_.getGlobalContext().getSettingsRef().s3_max_redirects);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
<yandex>
|
||||||
|
<profiles>
|
||||||
|
<default>
|
||||||
|
<s3_max_redirects>0</s3_max_redirects>
|
||||||
|
</default>
|
||||||
|
</profiles>
|
||||||
|
</yandex>
|
@ -85,6 +85,7 @@ def cluster():
|
|||||||
cluster.add_instance("restricted_dummy", main_configs=["configs/config_for_test_remote_host_filter.xml"],
|
cluster.add_instance("restricted_dummy", main_configs=["configs/config_for_test_remote_host_filter.xml"],
|
||||||
with_minio=True)
|
with_minio=True)
|
||||||
cluster.add_instance("dummy", with_minio=True, main_configs=["configs/defaultS3.xml"])
|
cluster.add_instance("dummy", with_minio=True, main_configs=["configs/defaultS3.xml"])
|
||||||
|
cluster.add_instance("s3_max_redirects", with_minio=True, main_configs=["configs/defaultS3.xml"], user_configs=["configs/s3_max_redirects.xml"])
|
||||||
logging.info("Starting cluster...")
|
logging.info("Starting cluster...")
|
||||||
cluster.start()
|
cluster.start()
|
||||||
logging.info("Cluster started")
|
logging.info("Cluster started")
|
||||||
@ -224,6 +225,34 @@ def test_put_get_with_redirect(cluster):
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# Test put with restricted S3 server redirect.
|
||||||
|
def test_put_with_zero_redirect(cluster):
|
||||||
|
# type: (ClickHouseCluster) -> None
|
||||||
|
|
||||||
|
bucket = cluster.minio_bucket
|
||||||
|
instance = cluster.instances["s3_max_redirects"] # type: ClickHouseInstance
|
||||||
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||||
|
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
|
||||||
|
filename = "test.csv"
|
||||||
|
|
||||||
|
# Should work without redirect
|
||||||
|
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
||||||
|
cluster.minio_host, cluster.minio_port, bucket, filename, table_format, values)
|
||||||
|
run_query(instance, query)
|
||||||
|
|
||||||
|
# Should not work with redirect
|
||||||
|
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
||||||
|
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format, values)
|
||||||
|
exception_raised = False
|
||||||
|
try:
|
||||||
|
run_query(instance, query)
|
||||||
|
except Exception as e:
|
||||||
|
assert str(e).find("Too many redirects while trying to access") != -1
|
||||||
|
exception_raised = True
|
||||||
|
finally:
|
||||||
|
assert exception_raised
|
||||||
|
|
||||||
|
|
||||||
def test_put_get_with_globs(cluster):
|
def test_put_get_with_globs(cluster):
|
||||||
# type: (ClickHouseCluster) -> None
|
# type: (ClickHouseCluster) -> None
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user