Add global proxy setting (#51749)

* initial impl

* fix env ut

* move ut directory

* make sure no null proxy resolver is returned by ProxyConfigurationResolverProvider

* minor adjustment

* add a few tests, still incomplete

* add proxy support for url table function

* use proxy for select from url as well

* remove optional from return type, just returns empty config

* fix style

* style

* black

* ohg boy

* rm in progress file

* god pls don't let me kill anyone

* ...

* add use_aws guards

* remove hard coded s3 proxy resolver

* add concurrency-mt-unsafe

* aa

* black

* add logging back

* revert change

* imrpove code a bit

* helper functions and separate tests

* for some reason, this env test is not working..

* formatting

* :)

* clangtidy

* lint

* revert some stupid things

* small test adjusmtments

* simplify tests

* rename test

* remove extra line

* freaking style change

* simplify a bit

* fix segfault & remove an extra call

* tightly couple proxy provider with context..

* remove useless include

* rename config prefix parameter

* simplify provider a bit

* organize provider a bit

* add a few comments

* comment out proxy env tests

* fix nullptr in unit tests

* make sure old storage proxy config is properly covered without global context instance

* move a few functions from class to anonymous namespace

* fix no fallback for specific storage conf

* change API to accept http method instead of bool

* implement http/https distinction in listresolver, any still not implemented

* implement http/https distinction in remote resolver

* progress on code, improve tests and add url function working test

* use protcol instead of method for http and https

* small fix

* few more adjustments

* fix style

* black

* move enum to proxyconfiguration

* wip

* fix build

* fix ut

* delete atomicroundrobin class

* remove stale include

* add some tests.. need to spend some more time on the design..

* change design a bit

* progress

* use existing context for tests

* rename aux function and fix ut

* ..

* rename test

* try to simplify tests a bit

* simplify tests a bit more

* attempt to fix tests, accept more than one remote resolver

* use proper log id

* try waiting for resolver

* proper wait logic

* black

* empty

* address a few comments

* refactor tests

* remove old tests

* baclk

* use RAII to set/unset env

* black

* clang tidy

* fix env proxy not respecting any

* use log trace

* fix wrong logic in getRemoteREsolver

* fix wrong logic in getRemoteREsolver

* fix test

* remove unwanted code

* remove ClientConfigurationperRequest and auxilary classes

* remove unwanted code

* remove adapter test

* few adjustments and add test for s3 storage conf  with new proxy settings

* black

* use chassert for context

* Add getenv comment
This commit is contained in:
Arthur Passos 2023-08-24 10:07:26 -03:00 committed by GitHub
parent 8555250277
commit 2bade7db08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
69 changed files with 1576 additions and 303 deletions

View File

@ -133,8 +133,6 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/S3Capabilities.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/diskSettings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/ProxyListConfiguration.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/createReadBufferFromFileBase.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ReadBufferFromRemoteFSGather.cpp

View File

@ -50,7 +50,8 @@ namespace
context->getRemoteHostFilter(),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false, settings.request_settings.get_request_throttler, settings.request_settings.put_request_throttler);
/* for_disk_s3 = */ false, settings.request_settings.get_request_throttler, settings.request_settings.put_request_throttler,
s3_uri.uri.getScheme());
client_configuration.endpointOverride = s3_uri.endpoint;
client_configuration.maxConnections = static_cast<unsigned>(context->getSettingsRef().s3_max_connections);

View File

@ -0,0 +1,74 @@
#include "EnvironmentProxyConfigurationResolver.h"
#include <Common/logger_useful.h>
#include <Poco/URI.h>
namespace DB
{
/*
* Usually environment variables are upper-case, but it seems like proxy related variables are an exception.
* See https://unix.stackexchange.com/questions/212894/whats-the-right-format-for-the-http-proxy-environment-variable-caps-or-no-ca/212972#212972
* */
static constexpr auto PROXY_HTTP_ENVIRONMENT_VARIABLE = "http_proxy";
static constexpr auto PROXY_HTTPS_ENVIRONMENT_VARIABLE = "https_proxy";
EnvironmentProxyConfigurationResolver::EnvironmentProxyConfigurationResolver(Protocol protocol_)
: protocol(protocol_)
{}
namespace
{
const char * getProxyHost(DB::ProxyConfiguration::Protocol protocol)
{
/*
* getenv is safe to use here because ClickHouse code does not make any call to `setenv` or `putenv`
* aside from tests and a very early call during startup: https://github.com/ClickHouse/ClickHouse/blob/master/src/Daemon/BaseDaemon.cpp#L791
* */
if (protocol == DB::ProxyConfiguration::Protocol::HTTP)
{
return std::getenv(PROXY_HTTP_ENVIRONMENT_VARIABLE); // NOLINT(concurrency-mt-unsafe)
}
else if (protocol == DB::ProxyConfiguration::Protocol::HTTPS)
{
return std::getenv(PROXY_HTTPS_ENVIRONMENT_VARIABLE); // NOLINT(concurrency-mt-unsafe)
}
else
{
if (const char * http_proxy_host = std::getenv(PROXY_HTTP_ENVIRONMENT_VARIABLE)) // NOLINT(concurrency-mt-unsafe)
{
return http_proxy_host;
}
else
{
return std::getenv(PROXY_HTTPS_ENVIRONMENT_VARIABLE); // NOLINT(concurrency-mt-unsafe)
}
}
}
}
ProxyConfiguration EnvironmentProxyConfigurationResolver::resolve()
{
const auto * proxy_host = getProxyHost(protocol);
if (!proxy_host)
{
return {};
}
auto uri = Poco::URI(proxy_host);
auto host = uri.getHost();
auto scheme = uri.getScheme();
auto port = uri.getPort();
LOG_TRACE(&Poco::Logger::get("EnvironmentProxyConfigurationResolver"), "Use proxy from environment: {}://{}:{}", scheme, host, port);
return ProxyConfiguration {
host,
ProxyConfiguration::protocolFromString(scheme),
port
};
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <Common/ProxyConfigurationResolver.h>
namespace DB
{
/*
* Grabs proxy configuration from environment variables (http_proxy and https_proxy).
* */
class EnvironmentProxyConfigurationResolver : public ProxyConfigurationResolver
{
public:
explicit EnvironmentProxyConfigurationResolver(Protocol protocol_);
ProxyConfiguration resolve() override;
void errorReport(const ProxyConfiguration &) override {}
private:
Protocol protocol;
};
}

View File

@ -0,0 +1,51 @@
#pragma once
#include <string>
namespace DB
{
struct ProxyConfiguration
{
enum class Protocol
{
HTTP,
HTTPS,
ANY
};
static auto protocolFromString(const std::string & str)
{
if (str == "http")
{
return Protocol::HTTP;
}
else if (str == "https")
{
return Protocol::HTTPS;
}
else
{
return Protocol::ANY;
}
}
static auto protocolToString(Protocol protocol)
{
switch (protocol)
{
case Protocol::HTTP:
return "http";
case Protocol::HTTPS:
return "https";
case Protocol::ANY:
return "any";
}
}
std::string host;
Protocol protocol;
uint16_t port;
};
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Common/ProxyConfiguration.h>
namespace DB
{
struct ProxyConfigurationResolver
{
using Protocol = ProxyConfiguration::Protocol;
virtual ~ProxyConfigurationResolver() = default;
virtual ProxyConfiguration resolve() = 0;
virtual void errorReport(const ProxyConfiguration & config) = 0;
};
}

View File

@ -0,0 +1,208 @@
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Common/EnvironmentProxyConfigurationResolver.h>
#include <Common/Exception.h>
#include <Common/ProxyListConfigurationResolver.h>
#include <Common/RemoteProxyConfigurationResolver.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
std::shared_ptr<ProxyConfigurationResolver> getRemoteResolver(
const String & config_prefix, const Poco::Util::AbstractConfiguration & configuration)
{
auto endpoint = Poco::URI(configuration.getString(config_prefix + ".endpoint"));
auto proxy_scheme = configuration.getString(config_prefix + ".proxy_scheme");
if (proxy_scheme != "http" && proxy_scheme != "https")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only HTTP/HTTPS schemas allowed in proxy resolver config: {}", proxy_scheme);
auto proxy_port = configuration.getUInt(config_prefix + ".proxy_port");
auto cache_ttl = configuration.getUInt(config_prefix + ".proxy_cache_time", 10);
LOG_DEBUG(&Poco::Logger::get("ProxyConfigurationResolverProvider"), "Configured remote proxy resolver: {}, Scheme: {}, Port: {}",
endpoint.toString(), proxy_scheme, proxy_port);
return std::make_shared<RemoteProxyConfigurationResolver>(endpoint, proxy_scheme, proxy_port, cache_ttl);
}
std::shared_ptr<ProxyConfigurationResolver> getRemoteResolver(
ProxyConfiguration::Protocol protocol, const String & config_prefix, const Poco::Util::AbstractConfiguration & configuration)
{
std::vector<String> keys;
configuration.keys(config_prefix, keys);
std::vector<Poco::URI> uris;
for (const auto & key : keys)
{
if (startsWith(key, "resolver"))
{
auto prefix_with_key = config_prefix + "." + key;
auto proxy_scheme_config_string = prefix_with_key + ".proxy_scheme";
auto config_protocol = configuration.getString(proxy_scheme_config_string);
if (ProxyConfiguration::Protocol::ANY == protocol || config_protocol == ProxyConfiguration::protocolToString(protocol))
{
return getRemoteResolver(prefix_with_key, configuration);
}
}
}
return nullptr;
}
auto extractURIList(const String & config_prefix, const Poco::Util::AbstractConfiguration & configuration)
{
std::vector<String> keys;
configuration.keys(config_prefix, keys);
std::vector<Poco::URI> uris;
for (const auto & key : keys)
{
if (startsWith(key, "uri"))
{
Poco::URI proxy_uri(configuration.getString(config_prefix + "." + key));
if (proxy_uri.getScheme() != "http" && proxy_uri.getScheme() != "https")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only HTTP/HTTPS schemas allowed in proxy uri: {}", proxy_uri.toString());
if (proxy_uri.getHost().empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty host in proxy uri: {}", proxy_uri.toString());
uris.push_back(proxy_uri);
LOG_DEBUG(&Poco::Logger::get("ProxyConfigurationResolverProvider"), "Configured proxy: {}", proxy_uri.toString());
}
}
return uris;
}
std::shared_ptr<ProxyConfigurationResolver> getListResolverNewSyntax(
ProxyConfiguration::Protocol protocol,
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
)
{
std::vector<Poco::URI> uris;
bool include_http_uris = ProxyConfiguration::Protocol::ANY == protocol || ProxyConfiguration::Protocol::HTTP == protocol;
if (include_http_uris && configuration.has(config_prefix + ".http"))
{
auto http_uris = extractURIList(config_prefix + ".http", configuration);
uris.insert(uris.end(), http_uris.begin(), http_uris.end());
}
bool include_https_uris = ProxyConfiguration::Protocol::ANY == protocol || ProxyConfiguration::Protocol::HTTPS == protocol;
if (include_https_uris && configuration.has(config_prefix + ".https"))
{
auto https_uris = extractURIList(config_prefix + ".https", configuration);
uris.insert(uris.end(), https_uris.begin(), https_uris.end());
}
return uris.empty() ? nullptr : std::make_shared<ProxyListConfigurationResolver>(uris);
}
std::shared_ptr<ProxyConfigurationResolver> getListResolverOldSyntax(
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
)
{
auto uris = extractURIList(config_prefix, configuration);
return uris.empty() ? nullptr : std::make_shared<ProxyListConfigurationResolver>(uris);
}
std::shared_ptr<ProxyConfigurationResolver> getListResolver(
ProxyConfiguration::Protocol protocol, const String & config_prefix, const Poco::Util::AbstractConfiguration & configuration
)
{
std::vector<String> keys;
configuration.keys(config_prefix, keys);
bool new_setting_syntax = std::find_if(
keys.begin(),
keys.end(),
[](const String & key)
{
return startsWith(key, "http") || startsWith(key, "https");
}) != keys.end();
return new_setting_syntax ? getListResolverNewSyntax(protocol, config_prefix, configuration)
: getListResolverOldSyntax(config_prefix, configuration);
}
}
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::get(Protocol protocol)
{
auto context = Context::getGlobalContextInstance();
chassert(context);
if (auto resolver = getFromSettings(protocol, "", context->getConfigRef()))
{
return resolver;
}
return std::make_shared<EnvironmentProxyConfigurationResolver>(protocol);
}
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::getFromSettings(
Protocol protocol,
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
)
{
auto proxy_prefix = config_prefix.empty() ? "proxy" : config_prefix + ".proxy";
if (configuration.has(proxy_prefix))
{
std::vector<String> config_keys;
configuration.keys(proxy_prefix, config_keys);
if (auto remote_resolver = getRemoteResolver(protocol, proxy_prefix, configuration))
{
return remote_resolver;
}
if (auto list_resolver = getListResolver(protocol, proxy_prefix, configuration))
{
return list_resolver;
}
}
return nullptr;
}
std::shared_ptr<ProxyConfigurationResolver> ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
)
{
/*
* First try to get it from settings only using the combination of config_prefix and configuration.
* This logic exists for backward compatibility with old S3 storage specific proxy configuration.
* */
if (auto resolver = ProxyConfigurationResolverProvider::getFromSettings(Protocol::ANY, config_prefix, configuration))
{
return resolver;
}
/*
* In case the combination of config_prefix and configuration does not provide a resolver, try to get it from general / new settings.
* Falls back to Environment resolver if no configuration is found.
* */
return ProxyConfigurationResolverProvider::get(Protocol::ANY);
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <base/types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ProxyConfigurationResolver.h>
namespace DB
{
class ProxyConfigurationResolverProvider
{
public:
using Protocol = ProxyConfiguration::Protocol;
/*
* Returns appropriate ProxyConfigurationResolver based on current CH settings (Remote resolver or List resolver).
* If no configuration is found, returns Environment Resolver.
* */
static std::shared_ptr<ProxyConfigurationResolver> get(Protocol protocol);
/*
* This API exists exclusively for backward compatibility with old S3 storage specific proxy configuration.
* If no configuration is found, returns nullptr.
* */
static std::shared_ptr<ProxyConfigurationResolver> getFromOldSettingsFormat(
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
);
private:
static std::shared_ptr<ProxyConfigurationResolver> getFromSettings(
Protocol protocol,
const String & config_prefix,
const Poco::Util::AbstractConfiguration & configuration
);
};
}

View File

@ -0,0 +1,31 @@
#include <Common/ProxyListConfigurationResolver.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Poco/URI.h>
namespace DB
{
ProxyListConfigurationResolver::ProxyListConfigurationResolver(std::vector<Poco::URI> proxies_)
: proxies(std::move(proxies_))
{
}
ProxyConfiguration ProxyListConfigurationResolver::resolve()
{
if (proxies.empty())
{
return {};
}
/// Avoid atomic increment if number of proxies is 1.
size_t index = proxies.size() > 1 ? (access_counter++) % proxies.size() : 0;
auto & proxy = proxies[index];
LOG_DEBUG(&Poco::Logger::get("ProxyListConfigurationResolver"), "Use proxy: {}", proxies[index].toString());
return ProxyConfiguration {proxy.getHost(), ProxyConfiguration::protocolFromString(proxy.getScheme()), proxy.getPort()};
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <base/types.h>
#include <Common/ProxyConfigurationResolver.h>
#include <Poco/URI.h>
namespace DB
{
/*
* Round-robin proxy list resolver.
* */
class ProxyListConfigurationResolver : public ProxyConfigurationResolver
{
public:
explicit ProxyListConfigurationResolver(std::vector<Poco::URI> proxies_);
ProxyConfiguration resolve() override;
void errorReport(const ProxyConfiguration &) override {}
private:
std::vector<Poco::URI> proxies;
/// Access counter to get proxy using round-robin strategy.
std::atomic<size_t> access_counter;
};
}

View File

@ -1,32 +1,36 @@
#include "ProxyResolverConfiguration.h"
#if USE_AWS_S3
#include <Common/RemoteProxyConfigurationResolver.h>
#include <utility>
#include <IO/HTTPCommon.h>
#include "Poco/StreamCopier.h"
#include <Poco/StreamCopier.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Common/logger_useful.h>
#include <Common/DNSResolver.h>
namespace DB::ErrorCodes
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace DB::S3
{
ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_
, unsigned proxy_port_, unsigned cache_ttl_)
: endpoint(endpoint_), proxy_scheme(std::move(proxy_scheme_)), proxy_port(proxy_port_), cache_ttl(cache_ttl_)
RemoteProxyConfigurationResolver::RemoteProxyConfigurationResolver(
const Poco::URI & endpoint_,
String proxy_protocol_,
unsigned proxy_port_,
unsigned cache_ttl_
)
: endpoint(endpoint_), proxy_protocol(std::move(proxy_protocol_)), proxy_port(proxy_port_), cache_ttl(cache_ttl_)
{
}
ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
ProxyConfiguration RemoteProxyConfigurationResolver::resolve()
{
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Obtain proxy using resolver: {}", endpoint.toString());
auto * logger = &Poco::Logger::get("RemoteProxyConfigurationResolver");
LOG_DEBUG(logger, "Obtain proxy using resolver: {}", endpoint.toString());
std::lock_guard lock(cache_mutex);
@ -34,7 +38,12 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const
if (cache_ttl.count() && cache_valid && now <= cache_timestamp + cache_ttl && now >= cache_timestamp)
{
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use cached proxy: {}://{}:{}", Aws::Http::SchemeMapper::ToString(cached_config.proxy_scheme), cached_config.proxy_host, cached_config.proxy_port);
LOG_DEBUG(logger,
"Use cached proxy: {}://{}:{}",
cached_config.protocol,
cached_config.host,
cached_config.port
);
return cached_config;
}
@ -84,11 +93,11 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const
/// Read proxy host as string from response body.
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use proxy: {}://{}:{}", proxy_scheme, proxy_host, proxy_port);
LOG_DEBUG(logger, "Use proxy: {}://{}:{}", proxy_protocol, proxy_host, proxy_port);
cached_config.proxy_scheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str());
cached_config.proxy_host = proxy_host;
cached_config.proxy_port = proxy_port;
cached_config.protocol = ProxyConfiguration::protocolFromString(proxy_protocol);
cached_config.host = proxy_host;
cached_config.port = proxy_port;
cache_timestamp = std::chrono::system_clock::now();
cache_valid = true;
@ -96,16 +105,14 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const
}
catch (...)
{
tryLogCurrentException("AWSClient", "Failed to obtain proxy");
/// Don't use proxy if it can't be obtained.
ClientConfigurationPerRequest cfg;
return cfg;
tryLogCurrentException("RemoteProxyConfigurationResolver", "Failed to obtain proxy");
return {};
}
}
void ProxyResolverConfiguration::errorReport(const ClientConfigurationPerRequest & config)
void RemoteProxyConfigurationResolver::errorReport(const ProxyConfiguration & config)
{
if (config.proxy_host.empty())
if (config.host.empty())
return;
std::lock_guard lock(cache_mutex);
@ -113,8 +120,8 @@ void ProxyResolverConfiguration::errorReport(const ClientConfigurationPerRequest
if (!cache_ttl.count() || !cache_valid)
return;
if (std::tie(cached_config.proxy_scheme, cached_config.proxy_host, cached_config.proxy_port)
!= std::tie(config.proxy_scheme, config.proxy_host, config.proxy_port))
if (std::tie(cached_config.protocol, cached_config.host, cached_config.port)
!= std::tie(config.protocol, config.host, config.port))
return;
/// Invalidate cached proxy when got error with this proxy
@ -122,5 +129,3 @@ void ProxyResolverConfiguration::errorReport(const ClientConfigurationPerRequest
}
}
#endif

View File

@ -0,0 +1,46 @@
#pragma once
#include <base/types.h>
#include <mutex>
#include <Common/ProxyConfigurationResolver.h>
#include <Poco/URI.h>
namespace DB
{
/*
* Makes an HTTP GET request to the specified endpoint to obtain a proxy host.
* */
class RemoteProxyConfigurationResolver : public ProxyConfigurationResolver
{
public:
RemoteProxyConfigurationResolver(
const Poco::URI & endpoint_,
String proxy_protocol_,
unsigned proxy_port_,
unsigned cache_ttl_
);
ProxyConfiguration resolve() override;
void errorReport(const ProxyConfiguration & config) override;
private:
/// Endpoint to obtain a proxy host.
const Poco::URI endpoint;
/// Scheme for obtained proxy.
const String proxy_protocol;
/// Port for obtained proxy.
const unsigned proxy_port;
std::mutex cache_mutex;
bool cache_valid = false;
std::chrono::time_point<std::chrono::system_clock> cache_timestamp;
const std::chrono::seconds cache_ttl{0};
ProxyConfiguration cached_config;
};
}

View File

@ -73,3 +73,25 @@ inline std::string xmlNodeAsString(Poco::XML::Node *pNode)
result += ("</"+ node_name + ">\n");
return Poco::XML::fromXMLString(result);
}
struct EnvironmentProxySetter
{
EnvironmentProxySetter(const Poco::URI & http_proxy, const Poco::URI & https_proxy)
{
if (!http_proxy.empty())
{
setenv("http_proxy", http_proxy.toString().c_str(), 1); // NOLINT(concurrency-mt-unsafe)
}
if (!https_proxy.empty())
{
setenv("https_proxy", https_proxy.toString().c_str(), 1); // NOLINT(concurrency-mt-unsafe)
}
}
~EnvironmentProxySetter()
{
unsetenv("http_proxy"); // NOLINT(concurrency-mt-unsafe)
unsetenv("https_proxy"); // NOLINT(concurrency-mt-unsafe)
}
};

View File

@ -0,0 +1,122 @@
#include <gtest/gtest.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Common/tests/gtest_global_context.h>
#include <Common/tests/gtest_helper_functions.h>
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
class ProxyConfigurationResolverProviderTests : public ::testing::Test
{
protected:
static void SetUpTestSuite() {
context = getContext().context;
}
static void TearDownTestSuite() {
context->setConfig(Poco::AutoPtr(new Poco::Util::MapConfiguration()));
}
static DB::ContextMutablePtr context;
};
DB::ContextMutablePtr ProxyConfigurationResolverProviderTests::context;
Poco::URI http_env_proxy_server = Poco::URI("http://http_environment_proxy:3128");
Poco::URI https_env_proxy_server = Poco::URI("http://https_environment_proxy:3128");
Poco::URI http_list_proxy_server = Poco::URI("http://http_list_proxy:3128");
Poco::URI https_list_proxy_server = Poco::URI("http://https_list_proxy:3128");
TEST_F(ProxyConfigurationResolverProviderTests, EnvironmentResolverShouldBeUsedIfNoSettings)
{
EnvironmentProxySetter setter(http_env_proxy_server, https_env_proxy_server);
auto http_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
auto https_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
ASSERT_EQ(http_configuration.host, http_env_proxy_server.getHost());
ASSERT_EQ(http_configuration.port, http_env_proxy_server.getPort());
ASSERT_EQ(http_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_env_proxy_server.getScheme()));
ASSERT_EQ(https_configuration.host, https_env_proxy_server.getHost());
ASSERT_EQ(https_configuration.port, https_env_proxy_server.getPort());
ASSERT_EQ(https_configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_env_proxy_server.getScheme()));
}
TEST_F(ProxyConfigurationResolverProviderTests, ListHTTPOnly)
{
ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
config->setString("proxy", "");
config->setString("proxy.http", "");
config->setString("proxy.http.uri", http_list_proxy_server.toString());
context->setConfig(config);
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
ASSERT_EQ(http_proxy_configuration.host, http_list_proxy_server.getHost());
ASSERT_EQ(http_proxy_configuration.port, http_list_proxy_server.getPort());
ASSERT_EQ(http_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_list_proxy_server.getScheme()));
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
// No https configuration since it's not set
ASSERT_EQ(https_proxy_configuration.host, "");
ASSERT_EQ(https_proxy_configuration.port, 0);
}
TEST_F(ProxyConfigurationResolverProviderTests, ListHTTPSOnly)
{
ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
config->setString("proxy", "");
config->setString("proxy.https", "");
config->setString("proxy.https.uri", https_list_proxy_server.toString());
context->setConfig(config);
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
ASSERT_EQ(http_proxy_configuration.host, "");
ASSERT_EQ(http_proxy_configuration.port, 0);
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
ASSERT_EQ(https_proxy_configuration.host, https_list_proxy_server.getHost());
// still HTTP because the proxy host is not HTTPS
ASSERT_EQ(https_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_list_proxy_server.getScheme()));
ASSERT_EQ(https_proxy_configuration.port, https_list_proxy_server.getPort());
}
TEST_F(ProxyConfigurationResolverProviderTests, ListBoth)
{
ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
config->setString("proxy", "");
config->setString("proxy.http", "");
config->setString("proxy.http.uri", http_list_proxy_server.toString());
config->setString("proxy", "");
config->setString("proxy.https", "");
config->setString("proxy.https.uri", https_list_proxy_server.toString());
context->setConfig(config);
auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve();
ASSERT_EQ(http_proxy_configuration.host, http_list_proxy_server.getHost());
ASSERT_EQ(http_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_list_proxy_server.getScheme()));
ASSERT_EQ(http_proxy_configuration.port, http_list_proxy_server.getPort());
auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve();
ASSERT_EQ(https_proxy_configuration.host, https_list_proxy_server.getHost());
// still HTTP because the proxy host is not HTTPS
ASSERT_EQ(https_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_list_proxy_server.getScheme()));
ASSERT_EQ(https_proxy_configuration.port, https_list_proxy_server.getPort());
}
// remote resolver is tricky to be tested in unit tests

View File

@ -0,0 +1,96 @@
#include <gtest/gtest.h>
#include <Common/EnvironmentProxyConfigurationResolver.h>
#include <Common/tests/gtest_helper_functions.h>
#include <Poco/URI.h>
namespace
{
auto http_proxy_server = Poco::URI("http://proxy_server:3128");
auto https_proxy_server = Poco::URI("https://proxy_server:3128");
}
TEST(EnvironmentProxyConfigurationResolver, TestHTTP)
{
EnvironmentProxySetter setter(http_proxy_server, {});
DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::HTTP);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, http_proxy_server.getHost());
ASSERT_EQ(configuration.port, http_proxy_server.getPort());
ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_proxy_server.getScheme()));
}
TEST(EnvironmentProxyConfigurationResolver, TestHTTPNoEnv)
{
DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::HTTP);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, "");
ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::Protocol::HTTP);
ASSERT_EQ(configuration.port, 0u);
}
TEST(EnvironmentProxyConfigurationResolver, TestHTTPs)
{
EnvironmentProxySetter setter({}, https_proxy_server);
DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::HTTPS);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, https_proxy_server.getHost());
ASSERT_EQ(configuration.port, https_proxy_server.getPort());
ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_proxy_server.getScheme()));
}
TEST(EnvironmentProxyConfigurationResolver, TestHTTPsNoEnv)
{
DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::HTTPS);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, "");
ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::Protocol::HTTP);
ASSERT_EQ(configuration.port, 0u);
}
TEST(EnvironmentProxyConfigurationResolver, TestANYHTTP)
{
EnvironmentProxySetter setter(http_proxy_server, {});
DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::ANY);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, http_proxy_server.getHost());
ASSERT_EQ(configuration.port, http_proxy_server.getPort());
ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_proxy_server.getScheme()));
}
TEST(EnvironmentProxyConfigurationResolver, TestANYHTTPS)
{
EnvironmentProxySetter setter({}, https_proxy_server);
DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::ANY);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, https_proxy_server.getHost());
ASSERT_EQ(configuration.port, https_proxy_server.getPort());
ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_proxy_server.getScheme()));
}
TEST(EnvironmentProxyConfigurationResolver, TestANYNoEnv)
{
DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::ANY);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, "");
ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::Protocol::HTTP);
ASSERT_EQ(configuration.port, 0u);
}

View File

@ -0,0 +1,26 @@
#include <gtest/gtest.h>
#include <Common/ProxyListConfigurationResolver.h>
#include <Poco/URI.h>
namespace
{
auto proxy_server1 = Poco::URI("http://proxy_server1:3128");
auto proxy_server2 = Poco::URI("http://proxy_server2:3128");
}
TEST(ProxyListConfigurationResolver, SimpleTest)
{
DB::ProxyListConfigurationResolver resolver({proxy_server1, proxy_server2});
auto configuration1 = resolver.resolve();
auto configuration2 = resolver.resolve();
ASSERT_EQ(configuration1.host, proxy_server1.getHost());
ASSERT_EQ(configuration1.port, proxy_server1.getPort());
ASSERT_EQ(configuration1.protocol, DB::ProxyConfiguration::protocolFromString(proxy_server1.getScheme()));
ASSERT_EQ(configuration2.host, proxy_server2.getHost());
ASSERT_EQ(configuration2.port, proxy_server2.getPort());
ASSERT_EQ(configuration2.protocol, DB::ProxyConfiguration::protocolFromString(proxy_server2.getScheme()));
}

View File

@ -92,7 +92,8 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
auth_settings.region,
RemoteHostFilter(), s3_max_redirects,
enable_s3_requests_logging,
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {});
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {},
new_uri.uri.getScheme());
client_configuration.endpointOverride = new_uri.endpoint;

View File

@ -1,25 +0,0 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <utility>
#include <base/types.h>
#include <IO/S3/PocoHTTPClient.h>
#include <Poco/URI.h>
namespace DB::S3
{
class ProxyConfiguration
{
public:
virtual ~ProxyConfiguration() = default;
/// Returns proxy configuration on each HTTP request.
virtual ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) = 0;
virtual void errorReport(const ClientConfigurationPerRequest & config) = 0;
};
}
#endif

View File

@ -1,32 +0,0 @@
#include "ProxyListConfiguration.h"
#if USE_AWS_S3
#include <utility>
#include <Common/logger_useful.h>
namespace DB::S3
{
ProxyListConfiguration::ProxyListConfiguration(std::vector<Poco::URI> proxies_) : proxies(std::move(proxies_)), access_counter(0)
{
}
ClientConfigurationPerRequest ProxyListConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
{
/// Avoid atomic increment if number of proxies is 1.
size_t index = proxies.size() > 1 ? (access_counter++) % proxies.size() : 0;
ClientConfigurationPerRequest cfg;
cfg.proxy_scheme = Aws::Http::SchemeMapper::FromString(proxies[index].getScheme().c_str());
cfg.proxy_host = proxies[index].getHost();
cfg.proxy_port = proxies[index].getPort();
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use proxy: {}", proxies[index].toString());
return cfg;
}
}
#endif

View File

@ -1,32 +0,0 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <atomic> // for std::atomic<size_t>
#include "ProxyConfiguration.h"
namespace DB::S3
{
/**
* For each request to S3 it chooses a proxy from the specified list using round-robin strategy.
*/
class ProxyListConfiguration : public ProxyConfiguration
{
public:
explicit ProxyListConfiguration(std::vector<Poco::URI> proxies_);
ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override;
void errorReport(const ClientConfigurationPerRequest &) override {}
private:
/// List of configured proxies.
const std::vector<Poco::URI> proxies;
/// Access counter to get proxy using round-robin strategy.
std::atomic<size_t> access_counter;
};
}
#endif

View File

@ -1,42 +0,0 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include "ProxyConfiguration.h"
#include <mutex>
namespace DB::S3
{
/**
* Proxy configuration where proxy host is obtained each time from specified endpoint.
* For each request to S3 it makes GET request to specified endpoint URL and reads proxy host from a response body.
* Specified scheme and port added to obtained proxy host to form completed proxy URL.
*/
class ProxyResolverConfiguration : public ProxyConfiguration
{
public:
ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_, unsigned cache_ttl_);
ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override;
void errorReport(const ClientConfigurationPerRequest & config) override;
private:
/// Endpoint to obtain a proxy host.
const Poco::URI endpoint;
/// Scheme for obtained proxy.
const String proxy_scheme;
/// Port for obtained proxy.
const unsigned proxy_port;
std::mutex cache_mutex;
bool cache_valid = false;
std::chrono::time_point<std::chrono::system_clock> cache_timestamp;
const std::chrono::seconds cache_ttl{0};
ClientConfigurationPerRequest cached_config;
};
}
#endif

View File

@ -5,6 +5,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
@ -17,9 +18,6 @@
#include <Storages/StorageS3Settings.h>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/S3/ProxyConfiguration.h>
#include <Disks/ObjectStorages/S3/ProxyListConfiguration.h>
#include <Disks/ObjectStorages/S3/ProxyResolverConfiguration.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/DiskLocal.h>
#include <Common/Macros.h>
@ -44,76 +42,15 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractC
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
}
std::shared_ptr<S3::ProxyResolverConfiguration> getProxyResolverConfiguration(
const String & prefix, const Poco::Util::AbstractConfiguration & proxy_resolver_config)
{
auto endpoint = Poco::URI(proxy_resolver_config.getString(prefix + ".endpoint"));
auto proxy_scheme = proxy_resolver_config.getString(prefix + ".proxy_scheme");
if (proxy_scheme != "http" && proxy_scheme != "https")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only HTTP/HTTPS schemas allowed in proxy resolver config: {}", proxy_scheme);
auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port");
auto cache_ttl = proxy_resolver_config.getUInt(prefix + ".proxy_cache_time", 10);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy resolver: {}, Scheme: {}, Port: {}",
endpoint.toString(), proxy_scheme, proxy_port);
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port, cache_ttl);
}
std::shared_ptr<S3::ProxyListConfiguration> getProxyListConfiguration(
const String & prefix, const Poco::Util::AbstractConfiguration & proxy_config)
{
std::vector<String> keys;
proxy_config.keys(prefix, keys);
std::vector<Poco::URI> proxies;
for (const auto & key : keys)
if (startsWith(key, "uri"))
{
Poco::URI proxy_uri(proxy_config.getString(prefix + "." + key));
if (proxy_uri.getScheme() != "http" && proxy_uri.getScheme() != "https")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only HTTP/HTTPS schemas allowed in proxy uri: {}", proxy_uri.toString());
if (proxy_uri.getHost().empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty host in proxy uri: {}", proxy_uri.toString());
proxies.push_back(proxy_uri);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy: {}", proxy_uri.toString());
}
if (!proxies.empty())
return std::make_shared<S3::ProxyListConfiguration>(proxies);
return nullptr;
}
std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const String & prefix, const Poco::Util::AbstractConfiguration & config)
{
if (!config.has(prefix + ".proxy"))
return nullptr;
std::vector<String> config_keys;
config.keys(prefix + ".proxy", config_keys);
if (auto resolver_configs = std::count(config_keys.begin(), config_keys.end(), "resolver"))
{
if (resolver_configs > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Multiple proxy resolver configurations aren't allowed");
return getProxyResolverConfiguration(prefix + ".proxy.resolver", config);
}
return getProxyListConfiguration(prefix + ".proxy", config);
}
std::unique_ptr<S3::Client> getClient(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const S3ObjectStorageSettings & settings)
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
config.getString(config_prefix + ".region", ""),
context->getRemoteHostFilter(),
@ -121,10 +58,9 @@ std::unique_ptr<S3::Client> getClient(
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ true,
settings.request_settings.get_request_throttler,
settings.request_settings.put_request_throttler);
settings.request_settings.put_request_throttler,
uri.uri.getScheme());
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
if (uri.key.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 path must ends with '/', but '{}' doesn't.", uri.key);
@ -136,11 +72,14 @@ std::unique_ptr<S3::Client> getClient(
client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000);
client_configuration.wait_on_pool_size_limit = false;
auto proxy_config = getProxyConfiguration(config_prefix, config);
/*
* Override proxy configuration for backwards compatibility with old configuration format.
* */
auto proxy_config = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(config_prefix, config);
if (proxy_config)
{
client_configuration.per_request_configuration
= [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
= [proxy_config]() { return proxy_config->resolve(); };
client_configuration.error_report
= [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); };
}

View File

@ -137,7 +137,12 @@ namespace
throw Exception(ErrorCodes::UNSUPPORTED_URI_SCHEME, "Unsupported scheme in URI '{}'", uri.toString());
}
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive)
HTTPSessionPtr makeHTTPSessionImpl(
const std::string & host,
UInt16 port,
bool https,
bool keep_alive,
Poco::Net::HTTPClientSession::ProxyConfig proxy_config = {})
{
HTTPSessionPtr session;
@ -158,6 +163,9 @@ namespace
/// doesn't work properly without patch
session->setKeepAlive(keep_alive);
session->setProxyConfig(proxy_config);
return session;
}
@ -333,13 +341,17 @@ void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_
response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
}
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
HTTPSessionPtr makeHTTPSession(
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
Poco::Net::HTTPClientSession::ProxyConfig proxy_config
)
{
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
auto session = makeHTTPSessionImpl(host, port, https, false);
auto session = makeHTTPSessionImpl(host, port, https, false, proxy_config);
setTimeouts(*session, timeouts);
return session;
}

View File

@ -69,7 +69,11 @@ void markSessionForReuse(PooledHTTPSessionPtr session);
void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout);
/// Create session object to perform requests and set required parameters.
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);
HTTPSessionPtr makeHTTPSession(
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
Poco::Net::HTTPClientSession::ProxyConfig proxy_config = {}
);
/// As previous method creates session, but tooks it from pool, without and with proxy uri.
PooledHTTPSessionPtr makePooledHTTPSession(

View File

@ -250,7 +250,8 @@ ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::ReadWriteBufferFromHTTPBase(
bool delay_initialization,
bool use_external_buffer_,
bool http_skip_not_found_url_,
std::optional<HTTPFileInfo> file_info_)
std::optional<HTTPFileInfo> file_info_,
Poco::Net::HTTPClientSession::ProxyConfig proxy_config_)
: SeekableReadBuffer(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
@ -265,6 +266,7 @@ ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::ReadWriteBufferFromHTTPBase(
, http_skip_not_found_url(http_skip_not_found_url_)
, settings {settings_}
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
, proxy_config(proxy_config_)
{
if (settings.http_max_tries <= 0 || settings.http_retry_initial_backoff_ms <= 0
|| settings.http_retry_initial_backoff_ms >= settings.http_retry_max_backoff_ms)
@ -848,12 +850,12 @@ HTTPFileInfo ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::parseFileInfo(con
}
SessionFactory::SessionFactory(const ConnectionTimeouts & timeouts_)
: timeouts(timeouts_) {}
SessionFactory::SessionFactory(const ConnectionTimeouts & timeouts_, Poco::Net::HTTPClientSession::ProxyConfig proxy_config_)
: timeouts(timeouts_), proxy_config(proxy_config_) {}
SessionFactory::SessionType SessionFactory::buildNewSession(const Poco::URI & uri)
{
return makeHTTPSession(uri, timeouts);
return makeHTTPSession(uri, timeouts, proxy_config);
}
ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
@ -870,9 +872,10 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
bool delay_initialization_,
bool use_external_buffer_,
bool skip_not_found_url_,
std::optional<HTTPFileInfo> file_info_)
std::optional<HTTPFileInfo> file_info_,
Poco::Net::HTTPClientSession::ProxyConfig proxy_config_)
: Parent(
std::make_shared<SessionType>(uri_, max_redirects, std::make_shared<SessionFactory>(timeouts)),
std::make_shared<SessionType>(uri_, max_redirects, std::make_shared<SessionFactory>(timeouts, proxy_config_)),
uri_,
credentials_,
method_,
@ -884,7 +887,8 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
delay_initialization_,
use_external_buffer_,
skip_not_found_url_,
file_info_) {}
file_info_,
proxy_config_) {}
PooledSessionFactory::PooledSessionFactory(

View File

@ -111,6 +111,8 @@ namespace detail
ReadSettings settings;
Poco::Logger * log;
Poco::Net::HTTPClientSession::ProxyConfig proxy_config;
bool withPartialContent(const HTTPRange & range) const;
size_t getOffset() const;
@ -161,7 +163,8 @@ namespace detail
bool delay_initialization = false,
bool use_external_buffer_ = false,
bool http_skip_not_found_url_ = false,
std::optional<HTTPFileInfo> file_info_ = std::nullopt);
std::optional<HTTPFileInfo> file_info_ = std::nullopt,
Poco::Net::HTTPClientSession::ProxyConfig proxy_config_ = {});
void callWithRedirects(Poco::Net::HTTPResponse & response, const String & method_, bool throw_on_all_errors = false, bool for_object_info = false);
@ -212,13 +215,14 @@ namespace detail
class SessionFactory
{
public:
explicit SessionFactory(const ConnectionTimeouts & timeouts_);
explicit SessionFactory(const ConnectionTimeouts & timeouts_, Poco::Net::HTTPClientSession::ProxyConfig proxy_config_ = {});
using SessionType = HTTPSessionPtr;
SessionType buildNewSession(const Poco::URI & uri);
private:
ConnectionTimeouts timeouts;
Poco::Net::HTTPClientSession::ProxyConfig proxy_config;
};
class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession<SessionFactory>>>
@ -241,7 +245,8 @@ public:
bool delay_initialization_ = true,
bool use_external_buffer_ = false,
bool skip_not_found_url_ = false,
std::optional<HTTPFileInfo> file_info_ = std::nullopt);
std::optional<HTTPFileInfo> file_info_ = std::nullopt,
Poco::Net::HTTPClientSession::ProxyConfig proxy_config_ = {});
};
class PooledSessionFactory

View File

@ -24,6 +24,8 @@
#include <Common/assert_cast.h>
#include <Common/logger_useful.h>
#include <Common/ProxyConfigurationResolverProvider.h>
namespace ProfileEvents
{
@ -861,16 +863,28 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
bool enable_s3_requests_logging,
bool for_disk_s3,
const ThrottlerPtr & get_request_throttler,
const ThrottlerPtr & put_request_throttler)
const ThrottlerPtr & put_request_throttler,
const String & protocol)
{
return PocoHTTPClientConfiguration(
auto proxy_configuration_resolver = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::protocolFromString(protocol));
auto per_request_configuration = [=] () { return proxy_configuration_resolver->resolve(); };
auto error_report = [=] (const DB::ProxyConfiguration & req) { proxy_configuration_resolver->errorReport(req); };
auto config = PocoHTTPClientConfiguration(
per_request_configuration,
force_region,
remote_host_filter,
s3_max_redirects,
enable_s3_requests_logging,
for_disk_s3,
get_request_throttler,
put_request_throttler);
put_request_throttler,
error_report);
config.scheme = Aws::Http::SchemeMapper::FromString(protocol.c_str());
return config;
}
}

View File

@ -314,7 +314,8 @@ public:
bool enable_s3_requests_logging,
bool for_disk_s3,
const ThrottlerPtr & get_request_throttler,
const ThrottlerPtr & put_request_throttler);
const ThrottlerPtr & put_request_throttler,
const String & protocol = "https");
private:
ClientFactory();

View File

@ -532,13 +532,13 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
configuration.enable_s3_requests_logging,
configuration.for_disk_s3,
configuration.get_request_throttler,
configuration.put_request_throttler);
configuration.put_request_throttler,
Aws::Http::SchemeMapper::ToString(Aws::Http::Scheme::HTTP));
/// See MakeDefaultHttpResourceClientConfiguration().
/// This is part of EC2 metadata client, but unfortunately it can't be accessed from outside
/// of contrib/aws/aws-cpp-sdk-core/source/internal/AWSHttpResourceClient.cpp
aws_client_configuration.maxConnections = 2;
aws_client_configuration.scheme = Aws::Http::Scheme::HTTP;
/// Explicitly set the proxy settings to empty/zero to avoid relying on defaults that could potentially change
/// in the future.

View File

@ -85,20 +85,24 @@ namespace DB::S3
{
PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
std::function<DB::ProxyConfiguration()> per_request_configuration_,
const String & force_region_,
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
bool enable_s3_requests_logging_,
bool for_disk_s3_,
const ThrottlerPtr & get_request_throttler_,
const ThrottlerPtr & put_request_throttler_)
: force_region(force_region_)
const ThrottlerPtr & put_request_throttler_,
std::function<void(const DB::ProxyConfiguration &)> error_report_)
: per_request_configuration(per_request_configuration_)
, force_region(force_region_)
, remote_host_filter(remote_host_filter_)
, s3_max_redirects(s3_max_redirects_)
, enable_s3_requests_logging(enable_s3_requests_logging_)
, for_disk_s3(for_disk_s3_)
, get_request_throttler(get_request_throttler_)
, put_request_throttler(put_request_throttler_)
, error_report(error_report_)
{
}
@ -262,8 +266,8 @@ void PocoHTTPClient::makeRequestInternal(
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
{
/// Most sessions in pool are already connected and it is not possible to set proxy host/port to a connected session.
const auto request_configuration = per_request_configuration(request);
if (http_connection_pool_size && request_configuration.proxy_host.empty())
const auto request_configuration = per_request_configuration();
if (http_connection_pool_size && request_configuration.host.empty())
makeRequestInternalImpl<true>(request, request_configuration, response, readLimiter, writeLimiter);
else
makeRequestInternalImpl<false>(request, request_configuration, response, readLimiter, writeLimiter);
@ -272,7 +276,7 @@ void PocoHTTPClient::makeRequestInternal(
template <bool pooled>
void PocoHTTPClient::makeRequestInternalImpl(
Aws::Http::HttpRequest & request,
const ClientConfigurationPerRequest & request_configuration,
const DB::ProxyConfiguration & request_configuration,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface *,
Aws::Utils::RateLimits::RateLimiterInterface *) const
@ -327,7 +331,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
Poco::URI target_uri(uri);
SessionPtr session;
if (!request_configuration.proxy_host.empty())
if (!request_configuration.host.empty())
{
if (enable_s3_requests_logging)
LOG_TEST(log, "Due to reverse proxy host name ({}) won't be resolved on ClickHouse side", uri);
@ -339,12 +343,12 @@ void PocoHTTPClient::makeRequestInternalImpl(
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit);
else
session = makeHTTPSession(target_uri, timeouts);
bool use_tunnel = request_configuration.proxy_scheme == Aws::Http::Scheme::HTTP && target_uri.getScheme() == "https";
bool use_tunnel = request_configuration.protocol == DB::ProxyConfiguration::Protocol::HTTP && target_uri.getScheme() == "https";
session->setProxy(
request_configuration.proxy_host,
request_configuration.proxy_port,
Aws::Http::SchemeMapper::ToString(request_configuration.proxy_scheme),
request_configuration.host,
request_configuration.port,
DB::ProxyConfiguration::protocolToString(request_configuration.protocol),
use_tunnel
);
}

View File

@ -9,6 +9,7 @@
#include <Common/RemoteHostFilter.h>
#include <Common/Throttler_fwd.h>
#include <Common/ProxyConfiguration.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/HTTPHeaderEntries.h>
@ -34,16 +35,9 @@ namespace DB::S3
{
class ClientFactory;
struct ClientConfigurationPerRequest
{
Aws::Http::Scheme proxy_scheme = Aws::Http::Scheme::HTTPS;
String proxy_host;
unsigned proxy_port = 0;
};
struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
{
std::function<ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration = [] (const Aws::Http::HttpRequest &) { return ClientConfigurationPerRequest(); };
std::function<DB::ProxyConfiguration()> per_request_configuration;
String force_region;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;
@ -62,17 +56,19 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
void updateSchemeAndRegion();
std::function<void(const ClientConfigurationPerRequest &)> error_report;
std::function<void(const DB::ProxyConfiguration &)> error_report;
private:
PocoHTTPClientConfiguration(
std::function<DB::ProxyConfiguration()> per_request_configuration_,
const String & force_region_,
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
bool enable_s3_requests_logging_,
bool for_disk_s3_,
const ThrottlerPtr & get_request_throttler_,
const ThrottlerPtr & put_request_throttler_
const ThrottlerPtr & put_request_throttler_,
std::function<void(const DB::ProxyConfiguration &)> error_report_
);
/// Constructor of Aws::Client::ClientConfiguration must be called after AWS SDK initialization.
@ -165,7 +161,7 @@ private:
template <bool pooled>
void makeRequestInternalImpl(
Aws::Http::HttpRequest & request,
const ClientConfigurationPerRequest & per_request_configuration,
const DB::ProxyConfiguration & per_request_configuration,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
@ -174,8 +170,8 @@ protected:
static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request);
void addMetric(const Aws::Http::HttpRequest & request, S3MetricType type, ProfileEvents::Count amount = 1) const;
std::function<ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration;
std::function<void(const ClientConfigurationPerRequest &)> error_report;
std::function<DB::ProxyConfiguration()> per_request_configuration;
std::function<void(const DB::ProxyConfiguration &)> error_report;
ConnectionTimeouts timeouts;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;

View File

@ -26,9 +26,19 @@
#include <IO/S3/Client.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/StorageS3Settings.h>
#include <Poco/Util/ServerApplication.h>
#include "TestPocoHTTPServer.h"
/*
* When all tests are executed together, `Context::getGlobalContextInstance()` is not null. Global context is used by
* ProxyResolvers to get proxy configuration (used by S3 clients). If global context does not have a valid ConfigRef, it relies on
* Poco::Util::Application::instance() to grab the config. However, at this point, the application is not yet initialized and
* `Poco::Util::Application::instance()` returns nullptr. This causes the test to fail. To fix this, we create a dummy application that takes
* care of initialization.
* */
[[maybe_unused]] static Poco::Util::ServerApplication app;
class NoRetryStrategy : public Aws::Client::StandardRetryStrategy
{
@ -125,7 +135,8 @@ void testServerSideEncryption(
enable_s3_requests_logging,
/* for_disk_s3 = */ false,
/* get_request_throttler = */ {},
/* put_request_throttler = */ {}
/* put_request_throttler = */ {},
uri.uri.getScheme()
);
client_configuration.endpointOverride = uri.endpoint;

View File

@ -13,9 +13,10 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
const std::string & content_encoding,
const HTTPHeaderEntries & additional_headers,
const ConnectionTimeouts & timeouts,
size_t buffer_size_)
size_t buffer_size_,
Poco::Net::HTTPClientSession::ProxyConfig proxy_configuration)
: WriteBufferFromOStream(buffer_size_)
, session{makeHTTPSession(uri, timeouts)}
, session{makeHTTPSession(uri, timeouts, proxy_configuration)}
, request{method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1}
{
request.setHost(uri.getHost());

View File

@ -25,7 +25,8 @@ public:
const std::string & content_encoding = "",
const HTTPHeaderEntries & additional_headers = {},
const ConnectionTimeouts & timeouts = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
Poco::Net::HTTPClientSession::ProxyConfig proxy_configuration = {});
private:
/// Receives response from the server after sending all data.

View File

@ -1162,7 +1162,8 @@ void StorageS3::Configuration::connect(ContextPtr context)
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false,
request_settings.get_request_throttler,
request_settings.put_request_throttler);
request_settings.put_request_throttler,
url.uri.getScheme());
client_configuration.endpointOverride = url.endpoint;
client_configuration.maxConnections = static_cast<unsigned>(request_settings.max_connections);

View File

@ -28,6 +28,7 @@
#include <Common/ThreadStatus.h>
#include <Common/parseRemoteDescription.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Common/ProfileEvents.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/HTTPHeaderEntries.h>
@ -161,6 +162,26 @@ namespace
{
return parseRemoteDescription(uri, 0, uri.size(), '|', max_addresses);
}
auto proxyConfigurationToPocoProxyConfiguration(const ProxyConfiguration & proxy_configuration)
{
Poco::Net::HTTPClientSession::ProxyConfig poco_proxy_config;
poco_proxy_config.host = proxy_configuration.host;
poco_proxy_config.port = proxy_configuration.port;
poco_proxy_config.protocol = ProxyConfiguration::protocolToString(proxy_configuration.protocol);
return poco_proxy_config;
}
auto getProxyConfiguration(const std::string & protocol_string)
{
auto protocol = protocol_string == "https" ? ProxyConfigurationResolver::Protocol::HTTPS
: ProxyConfigurationResolver::Protocol::HTTP;
auto proxy_config = ProxyConfigurationResolverProvider::get(protocol)->resolve();
return proxyConfigurationToPocoProxyConfiguration(proxy_config);
}
}
class StorageURLSource::DisclosedGlobIterator::Impl
@ -402,6 +423,8 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
const auto settings = context->getSettings();
auto proxy_config = getProxyConfiguration(http_method);
try
{
auto res = std::make_unique<ReadWriteBufferFromHTTP>(
@ -417,7 +440,9 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
/* skip_url_not_found_error */ skip_url_not_found_error,
/* file_info */ std::nullopt,
proxy_config);
if (context->getSettingsRef().engine_url_skip_empty_files && res->eof() && option != std::prev(end))
{
@ -464,10 +489,17 @@ StorageURLSink::StorageURLSink(
std::string content_type = FormatFactory::instance().getContentType(format, context, format_settings);
std::string content_encoding = toContentEncodingName(compression_method);
auto proxy_config = getProxyConfiguration(http_method);
auto write_buffer = std::make_unique<WriteBufferFromHTTP>(
Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts, DBMS_DEFAULT_BUFFER_SIZE, proxy_config
);
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts),
std::move(write_buffer),
compression_method,
3);
3
);
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, format_settings);
}
@ -948,8 +980,12 @@ std::optional<time_t> IStorageURLBase::getLastModificationTime(
try
{
auto uri = Poco::URI(url);
auto proxy_config = getProxyConfiguration(uri.getScheme());
ReadWriteBufferFromHTTP buf(
Poco::URI(url),
uri,
Poco::Net::HTTPRequest::HTTP_GET,
{},
getHTTPTimeouts(context),
@ -961,7 +997,9 @@ std::optional<time_t> IStorageURLBase::getLastModificationTime(
&context->getRemoteHostFilter(),
true,
false,
false);
false,
std::nullopt,
proxy_config);
return buf.getLastModificationTime();
}

View File

@ -0,0 +1,88 @@
import os
import time
def check_proxy_logs(
cluster, proxy_instance, protocol, bucket, http_methods={"POST", "PUT", "GET"}
):
for i in range(10):
logs = cluster.get_container_logs(proxy_instance)
# Check with retry that all possible interactions with Minio are present
for http_method in http_methods:
if (
logs.find(http_method + f" {protocol}://minio1:9001/root/data/{bucket}")
>= 0
):
return
time.sleep(1)
else:
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
def wait_resolver(cluster):
for i in range(10):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://resolver:8080/hostname",
],
nothrow=True,
)
if response == "proxy1":
return
time.sleep(i)
else:
assert False, "Resolver is not up"
# Runs simple proxy resolver in python env container.
def run_resolver(cluster, current_dir):
container_id = cluster.get_container_id("resolver")
cluster.copy_file_to_container(
container_id,
os.path.join(current_dir, "proxy-resolver", "resolver.py"),
"resolver.py",
)
cluster.exec_in_container(container_id, ["python", "resolver.py"], detach=True)
wait_resolver(cluster)
def build_s3_endpoint(protocol, bucket):
return f"{protocol}://minio1:9001/root/data/{bucket}/test.csv"
def perform_simple_queries(node, minio_endpoint):
node.query(
f"""
INSERT INTO FUNCTION
s3('{minio_endpoint}', 'minio', 'minio123', 'CSV', 'key String, value String')
VALUES ('color','red'),('size','10')
"""
)
assert (
node.query(
f"SELECT * FROM s3('{minio_endpoint}', 'minio', 'minio123', 'CSV') FORMAT Values"
)
== "('color','red'),('size','10')"
)
assert (
node.query(
f"SELECT * FROM s3('{minio_endpoint}', 'minio', 'minio123', 'CSV') FORMAT Values"
)
== "('color','red'),('size','10')"
)
def simple_test(cluster, proxies, protocol, bucket):
minio_endpoint = build_s3_endpoint(protocol, bucket)
node = cluster.instances[f"{bucket}"]
perform_simple_queries(node, minio_endpoint)
for proxy in proxies:
check_proxy_logs(cluster, proxy, protocol, bucket)

View File

@ -0,0 +1,8 @@
<clickhouse>
<proxy>
<http>
<uri>http://proxy1</uri>
<uri>http://proxy2</uri>
</http>
</proxy>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<max_concurrent_queries>500</max_concurrent_queries>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</clickhouse>

View File

@ -0,0 +1,66 @@
import logging
import time
import pytest
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=[
"configs/config.d/storage_conf.xml",
"configs/config.d/proxy_list.xml",
],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET"}):
for i in range(10):
logs = cluster.get_container_logs(proxy_instance)
# Check with retry that all possible interactions with Minio are present
for http_method in http_methods:
if logs.find(http_method + " http://minio1") >= 0:
return
time.sleep(1)
else:
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
@pytest.mark.parametrize("policy", ["s3"])
def test_s3_with_proxy_list(cluster, policy):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
policy
)
)
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
assert (
node.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data')"
)
node.query("DROP TABLE IF EXISTS s3_test SYNC")
for proxy in ["proxy1", "proxy2"]:
check_proxy_logs(cluster, proxy, ["PUT", "GET"])

View File

@ -0,0 +1,5 @@
<clickhouse>
<profiles>
<default/>
</profiles>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<max_concurrent_queries>500</max_concurrent_queries>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</clickhouse>

View File

@ -0,0 +1,14 @@
import random
import bottle
@bottle.route("/hostname")
def index():
if random.randrange(2) == 0:
return "proxy1"
else:
return "proxy2"
bottle.run(host="0.0.0.0", port=8080)

View File

@ -4,18 +4,7 @@ import time
import pytest
from helpers.cluster import ClickHouseCluster
# Runs simple proxy resolver in python env container.
def run_resolver(cluster):
container_id = cluster.get_container_id("resolver")
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(
container_id,
os.path.join(current_dir, "proxy-resolver", "resolver.py"),
"resolver.py",
)
cluster.exec_in_container(container_id, ["python", "resolver.py"], detach=True)
import helpers.s3_url_proxy_tests_util as proxy_util
@pytest.fixture(scope="module")
@ -29,7 +18,7 @@ def cluster():
cluster.start()
logging.info("Cluster started")
run_resolver(cluster)
proxy_util.run_resolver(cluster, os.path.dirname(__file__))
logging.info("Proxy resolver started")
yield cluster
@ -46,7 +35,7 @@ def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET"
return
time.sleep(1)
else:
assert False, "http method not found in logs"
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
@pytest.mark.parametrize("policy", ["s3", "s3_with_resolver"])
@ -65,7 +54,6 @@ def test_s3_with_proxy_list(cluster, policy):
policy
)
)
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
assert (
node.query("SELECT * FROM s3_test order by id FORMAT Values")

View File

@ -0,0 +1,8 @@
<clickhouse>
<proxy>
<http>
<uri>http://proxy1</uri>
<uri>http://proxy2</uri>
</http>
</proxy>
</clickhouse>

View File

@ -0,0 +1,15 @@
<clickhouse>
<proxy>
<!--
At each interaction with S3 resolver sends empty GET request to specified endpoint URL to obtain proxy host.
Proxy host is returned as string in response body.
Then S3 client uses proxy URL formed as proxy_scheme://proxy_host:proxy_port to make request.
-->
<resolver>
<endpoint>http://resolver:8080/hostname</endpoint>
<proxy_scheme>http</proxy_scheme>
<proxy_port>80</proxy_port>
<proxy_cache_time>10</proxy_cache_time>
</resolver>
</proxy>
</clickhouse>

View File

@ -0,0 +1,11 @@
import random
import bottle
@bottle.route("/hostname")
def index():
return "proxy1"
bottle.run(host="0.0.0.0", port=8080)

View File

@ -0,0 +1,59 @@
import logging
import helpers.s3_url_proxy_tests_util as proxy_util
import os
import pytest
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"remote_proxy_node",
main_configs=[
"configs/config.d/proxy_remote.xml",
],
with_minio=True,
)
cluster.add_instance(
"proxy_list_node",
main_configs=[
"configs/config.d/proxy_list.xml",
],
with_minio=True,
)
cluster.add_instance(
"env_node",
with_minio=True,
env_variables={
"http_proxy": "http://proxy1",
},
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
proxy_util.run_resolver(cluster, os.path.dirname(__file__))
logging.info("Proxy resolver started")
yield cluster
finally:
cluster.shutdown()
def test_s3_with_http_proxy_list(cluster):
proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "http", "proxy_list_node")
def test_s3_with_http_remote_proxy(cluster):
proxy_util.simple_test(cluster, ["proxy1"], "http", "remote_proxy_node")
def test_s3_with_http_env_proxy(cluster):
proxy_util.simple_test(cluster, ["proxy1"], "http", "env_node")

View File

@ -0,0 +1,12 @@
<clickhouse>
<proxy>
<http>
<uri>http://proxy1</uri>
<uri>http://proxy2</uri>
</http>
<https>
<uri>https://proxy1</uri>
<uri>https://proxy2</uri>
</https>
</proxy>
</clickhouse>

View File

@ -0,0 +1,22 @@
<clickhouse>
<proxy>
<!--
At each interaction with S3 resolver sends empty GET request to specified endpoint URL to obtain proxy host.
Proxy host is returned as string in response body.
Then S3 client uses proxy URL formed as proxy_scheme://proxy_host:proxy_port to make request.
-->
<resolver>
<endpoint>http://resolver:8080/hostname</endpoint>
<proxy_scheme>http</proxy_scheme>
<proxy_port>80</proxy_port>
<proxy_cache_time>10</proxy_cache_time>
</resolver>
<resolver>
<endpoint>http://resolver:8080/hostname</endpoint>
<proxy_scheme>https</proxy_scheme>
<proxy_port>443</proxy_port>
<proxy_cache_time>10</proxy_cache_time>
</resolver>
</proxy>
</clickhouse>

View File

@ -0,0 +1,11 @@
<clickhouse>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
</clickhouse>

View File

@ -0,0 +1,5 @@
<clickhouse>
<profiles>
<default/>
</profiles>
</clickhouse>

View File

@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC+TCCAeGgAwIBAgIQfF4j70ZdR/W3XlFJq5iZgDANBgkqhkiG9w0BAQsFADAS
MRAwDgYDVQQKEwdBY21lIENvMB4XDTIwMDcwOTE1MTQ1M1oXDTIxMDcwOTE1MTQ1
M1owEjEQMA4GA1UEChMHQWNtZSBDbzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC
AQoCggEBAM4i2tOlbbDxcvckVK/Zms95n2ipr7dZ0qToSf8qmF5d2EH6mqC0Vv2d
MJ+8JhQEKBh8AvUjmSqjd8tWmLJcqA84Gc8s8stB565wwkaMBvMExKlO+PQtynRd
xZjQVnj16hB0ZP4JHeVUOqMQa7uPQZQp6kqdkJ3u84EhRmU8fCCtUPOJIYHcfx7P
ScYfmJCpmqxrfWP18XcyYlhoCTm/nV+XT+XfUGwc6Sok5pCX5C70PiQ5MrEvYDIC
Q3iRNi2Lj4pTG8GUSwAcKLB08o7mxHvR1MGDGohtGnSAhdniK9aljNmBQfNIErFI
3529YDMW/qwRKSEkJpMy7r8RkfYamUsCAwEAAaNLMEkwDgYDVR0PAQH/BAQDAgKk
MBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEwEB/wQFMAMBAf8wEQYDVR0RBAow
CIIGbWluaW8xMA0GCSqGSIb3DQEBCwUAA4IBAQDAlDKuJfQHzsBtFioNnf9VS+LA
m/aUG7ngl0Orynef45Kl21l1ToM0feKk1UnEN8crwE0wjZttby84sYaq0us7mnbl
CnFsvphNqk5jxMbSs/8Vx4TVEimyO7r5JeG4J9lEnAu2hKK5ZlwPzj7G8bL4fOvk
OGiiP5r0K3wTVU/Y96MmDUaJwBNiyp7WtsBRzkteSPQJDC98gUCYeYsIFokUs3gz
ILOAbGQBLKUn9kmYc+/LLNha0nsC0eQGmLaJgIYfele63c6KkklQ3ePjRZ71JfmN
TulovRrwUf0J4hYcIgC1URZbClsnQDOBFCY6Lm8eI+IGNWWU4I9WGoJ1Lkvk
-----END CERTIFICATE-----

View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC9ORgaBCx42ejp
9PSjc0uvwH/hTB6yZvZB4S+wxbzzfeKomX/JBcFHmGCIJJVjVV0rafv3vw+9f9u4
wrZpN4HZKnVyz3mBXEA1WDvLTLV8n8zVyso1qbnfF9Fa8wnk89b0xGWyM7jie7/c
TIGMrgm7hIPaM2zDzFwIfIAqZ1AexC4vADIffF9rcFLLjNHuv1uAc32jdfQEPluv
mBMzGkz254+MabxZWIZjkYn70kNSZDoyFmMGafBtkRTUPNq2+fGv/eLJ9Lxm3153
Ja0sCyzLlEo9+/z4ERqM5zwWre4vcwfO63c5pcSCzGw84teTpmDwSyiSR70TYJdt
BGQqZvLZAgMBAAECggEANe8oJ4I5CtlRwh3H/S7Hy/iaeqUvuroORwjghwpVqTGg
gV3/RlUVmkqceTG0QvP58n3rC9qxqdnfzvHw/FyN7lBj2a25fF3HD21u3aunrzX9
NJLwwAr4p9YqHjpX/6JhCrNQKVMEx8luDmTgKDETJRfIXVF7FvQQ53pVLcD03U+g
MgN61HBzfT5L0TLHoiKNQbVi+Wm1gw3zvb/a9Z1rULRZfIuKGM0bNNqRZt4rUUAV
QicklDR0Qv59jhr5Y/zjinKkqF8qudvUkaNT2JH1DLfXiAhuC0OQugMjYzNntQB4
hMhkqARnjuk/WPMvnXivnqx9o69BL5wyXIj3vD4fgQKBgQDVKaXAZJ5bo3VfcpLm
cyjtUuOzAxLU1bVGI0Hm1ARqeGVxSTypZLSX8xFi2n5Bvbgh/Y60aEac/1uKoXA9
gej1MT4hKpXyagrARx97E8zk5nf88kVxkiKUrifMjP2lDzHIYhdKk9R3SiV6gWvA
FoJtjBwFhJ6uWUPyry4nqFSENQKBgQDjP9k6CTZF0EnDqbADiQr7VKpebqhtLWRD
U0bQh/l57VrWqGksVOlivIJChP49q1H+hQ1YgfKIEDag8JJnf/inUSpVsw1ljAjv
knqNzn0Gdd9lTsiNGgqlCjhmWedkh4eO8uau479TwQc6gB4PQdLAFynQtt8Kk45P
GxdpRx4AlQKBgQCgxUGbYwhBC37aF1sObqrenBbajCXm2qxXEv6Ab0ZJWzb/g4I6
LJc8x3pEeZCiWsoG8Otxy/f+L2bGn049Rb8DNzmp4Cmp5SrorHvk4yE1P1IeOEgC
CXsFcnjYATrJBDXC8aCpgefMdOLhi71N6mxC3VrBGq5nxzHFVzTTelUMRQKBgQDa
yekhiCb5liy+tcuhy7qH+Z7BpjaATrh+XVoLgS5+5jeT/basmN/OUQH0e0iwJRaf
Poh30zynJT0DPDsobLwAkxN4SRg30Vf1GAjoKIqUwr2fMvfBafYfqbRdTmeKkTXB
OjlA3kKhp3GHMDxAojX+/Q4kRTx+WUwk+0dR88d99QKBgEiYrkSLjKXUFllDmVyp
HtlYKZiq5c33DA06SA2uVOprCdTbnbvP4WrgUsLGvqBcaPEd06fGGbvJWwUdnkXM
HNAkqSeUe5ueovidtoPdF+aPyxdGg3Z8551xOoHZFYrvgdZ4YMPcJrwQQsvWCcYP
GDnSoD8Xjd2LmekTpDBt5ZVz
-----END PRIVATE KEY-----

View File

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDBTCCAe2gAwIBAgIRANb2pr4HgR8YFwKNJMUSWiIwDQYJKoZIhvcNAQELBQAw
EjEQMA4GA1UEChMHQWNtZSBDbzAeFw0yMDA3MDkxODE1MDBaFw0yMTA3MDkxODE1
MDBaMBIxEDAOBgNVBAoTB0FjbWUgQ28wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQC9ORgaBCx42ejp9PSjc0uvwH/hTB6yZvZB4S+wxbzzfeKomX/JBcFH
mGCIJJVjVV0rafv3vw+9f9u4wrZpN4HZKnVyz3mBXEA1WDvLTLV8n8zVyso1qbnf
F9Fa8wnk89b0xGWyM7jie7/cTIGMrgm7hIPaM2zDzFwIfIAqZ1AexC4vADIffF9r
cFLLjNHuv1uAc32jdfQEPluvmBMzGkz254+MabxZWIZjkYn70kNSZDoyFmMGafBt
kRTUPNq2+fGv/eLJ9Lxm3153Ja0sCyzLlEo9+/z4ERqM5zwWre4vcwfO63c5pcSC
zGw84teTpmDwSyiSR70TYJdtBGQqZvLZAgMBAAGjVjBUMA4GA1UdDwEB/wQEAwIC
pDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MBwGA1UdEQQV
MBOCBm1pbmlvMYIJbG9jYWxob3N0MA0GCSqGSIb3DQEBCwUAA4IBAQAKU2LhvFFz
RFfUibt/WTj3rtUfKEBrQuUOYt2A8MTbC8pyEu+UJASTzunluUFze5zchEm1s3pZ
YRLcNwbJqLE6CzUxQ9b2iUhaeWuKrx4ZoPkY0uGiaXM/iKfVKTuNmhF2Sf/P4xUE
Pt19yQjpIhcicWQc37BBQFvnvy+n5wgHa/pgl1+QUvAa/fwYhF9S28xRLESzZepm
NMYysopV+YMaxcFa9SH44toXtXnvRWwVdEorlq1W3/AiJg8hDPzSa9UXLMjA968J
ONtn3qvwac9Ot53+QsXJdsMmDZLWGCi6I1w0ZQetpr/0ubaA1F3GdK9eB/S0thqU
l2VUgn3c/kKS
-----END CERTIFICATE-----

View File

@ -0,0 +1,11 @@
import random
import bottle
@bottle.route("/hostname")
def index():
return "proxy1"
bottle.run(host="0.0.0.0", port=8080)

View File

@ -0,0 +1,67 @@
import logging
import helpers.s3_url_proxy_tests_util as proxy_util
import os
import pytest
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
# minio_certs_dir is set only once and used by all instances
cluster.add_instance(
"remote_proxy_node",
main_configs=[
"configs/config.d/proxy_remote.xml",
"configs/config.d/ssl.xml",
],
with_minio=True,
minio_certs_dir="minio_certs",
)
cluster.add_instance(
"proxy_list_node",
main_configs=[
"configs/config.d/proxy_list.xml",
"configs/config.d/ssl.xml",
],
with_minio=True,
)
cluster.add_instance(
"env_node",
main_configs=[
"configs/config.d/ssl.xml",
],
with_minio=True,
env_variables={
"https_proxy": "https://proxy1",
},
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
proxy_util.run_resolver(cluster, os.path.dirname(__file__))
logging.info("Proxy resolver started")
yield cluster
finally:
cluster.shutdown()
def test_s3_with_https_proxy_list(cluster):
proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "https", "proxy_list_node")
def test_s3_with_https_remote_proxy(cluster):
proxy_util.simple_test(cluster, ["proxy1"], "https", "remote_proxy_node")
def test_s3_with_https_env_proxy(cluster):
proxy_util.simple_test(cluster, ["proxy1"], "https", "env_node")

View File

@ -0,0 +1,7 @@
<clickhouse>
<proxy>
<http>
<uri>http://proxy1</uri>
</http>
</proxy>
</clickhouse>

View File

@ -0,0 +1,84 @@
import logging
import time
from datetime import datetime
import hmac
import hashlib
import base64
import pytest
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"proxy_list_node",
main_configs=["configs/config.d/proxy_list.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def check_proxy_logs(cluster, proxy_instance, http_methods):
minio_ip = cluster.get_instance_ip("minio1")
for i in range(10):
logs = cluster.get_container_logs(proxy_instance)
# Check with retry that all possible interactions with Minio are present
for http_method in http_methods:
method_with_domain = http_method + " http://minio1"
method_with_ip = http_method + f" http://{minio_ip}"
logging.info(f"Method with ip: {method_with_ip}")
has_get_minio_logs = (
logs.find(method_with_domain) >= 0 or logs.find(method_with_ip) >= 0
)
if has_get_minio_logs:
return
time.sleep(1)
else:
assert False, "http method not found in logs"
def test_s3_with_proxy_list(cluster):
node = cluster.instances["proxy_list_node"]
# insert into function url uses POST and minio expects PUT
node.query(
"""
INSERT INTO FUNCTION
s3('http://minio1:9001/root/data/ch-proxy-test/test.csv', 'minio', 'minio123', 'CSV', 'key String, value String')
VALUES ('color','red'),('size','10')
"""
)
content_type = "application/zstd"
date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S +0000")
resource = "/root/data/ch-proxy-test/test.csv"
get_sig_string = f"GET\n\n{content_type}\n{date}\n{resource}"
password = "minio123"
get_digest = hmac.new(
password.encode("utf-8"), get_sig_string.encode("utf-8"), hashlib.sha1
).digest()
get_signature = base64.b64encode(get_digest).decode("utf-8")
assert (
node.query(
"SELECT * FROM url('http://minio1:9001/root/data/ch-proxy-test/test.csv', 'CSV', 'a String, b String',"
f"headers('Host'='minio1', 'Date'= '{date}', 'Content-Type'='{content_type}',"
f"'Authorization'='AWS minio:{get_signature}')) FORMAT Values"
)
== "('color','red'),('size','10')"
)
check_proxy_logs(cluster, "proxy1", ["GET"])