mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #63427 from arthurpassos/fix_wrong_request_protocol_proxy
Several minor fixes to proxy support in ClickHouse
This commit is contained in:
commit
dd6ae2f77c
@ -43,12 +43,13 @@ namespace
|
|||||||
endpoint,
|
endpoint,
|
||||||
proxy_scheme,
|
proxy_scheme,
|
||||||
proxy_port,
|
proxy_port,
|
||||||
cache_ttl
|
std::chrono::seconds {cache_ttl}
|
||||||
};
|
};
|
||||||
|
|
||||||
return std::make_shared<RemoteProxyConfigurationResolver>(
|
return std::make_shared<RemoteProxyConfigurationResolver>(
|
||||||
server_configuration,
|
server_configuration,
|
||||||
request_protocol,
|
request_protocol,
|
||||||
|
std::make_shared<RemoteProxyHostFetcherImpl>(),
|
||||||
isTunnelingDisabledForHTTPSRequestsOverHTTPProxy(configuration));
|
isTunnelingDisabledForHTTPSRequestsOverHTTPProxy(configuration));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,22 +6,47 @@
|
|||||||
#include <Poco/Net/HTTPRequest.h>
|
#include <Poco/Net/HTTPRequest.h>
|
||||||
#include <Poco/Net/HTTPResponse.h>
|
#include <Poco/Net/HTTPResponse.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <Common/DNSResolver.h>
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string RemoteProxyHostFetcherImpl::fetch(const Poco::URI & endpoint, const ConnectionTimeouts & timeouts)
|
||||||
|
{
|
||||||
|
auto request = Poco::Net::HTTPRequest(Poco::Net::HTTPRequest::HTTP_GET, endpoint.getPath(), Poco::Net::HTTPRequest::HTTP_1_1);
|
||||||
|
auto session = makeHTTPSession(HTTPConnectionGroupType::HTTP, endpoint, timeouts);
|
||||||
|
|
||||||
|
session->sendRequest(request);
|
||||||
|
|
||||||
|
Poco::Net::HTTPResponse response;
|
||||||
|
auto & response_body_stream = session->receiveResponse(response);
|
||||||
|
|
||||||
|
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
|
||||||
|
throw HTTPException(
|
||||||
|
ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER,
|
||||||
|
endpoint.toString(),
|
||||||
|
response.getStatus(),
|
||||||
|
response.getReason(),
|
||||||
|
"");
|
||||||
|
|
||||||
|
std::string proxy_host;
|
||||||
|
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
|
||||||
|
|
||||||
|
return proxy_host;
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteProxyConfigurationResolver::RemoteProxyConfigurationResolver(
|
RemoteProxyConfigurationResolver::RemoteProxyConfigurationResolver(
|
||||||
const RemoteServerConfiguration & remote_server_configuration_,
|
const RemoteServerConfiguration & remote_server_configuration_,
|
||||||
Protocol request_protocol_,
|
Protocol request_protocol_,
|
||||||
|
std::shared_ptr<RemoteProxyHostFetcher> fetcher_,
|
||||||
bool disable_tunneling_for_https_requests_over_http_proxy_
|
bool disable_tunneling_for_https_requests_over_http_proxy_
|
||||||
)
|
)
|
||||||
: ProxyConfigurationResolver(request_protocol_, disable_tunneling_for_https_requests_over_http_proxy_), remote_server_configuration(remote_server_configuration_)
|
: ProxyConfigurationResolver(request_protocol_, disable_tunneling_for_https_requests_over_http_proxy_),
|
||||||
|
remote_server_configuration(remote_server_configuration_), fetcher(fetcher_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,9 +54,7 @@ ProxyConfiguration RemoteProxyConfigurationResolver::resolve()
|
|||||||
{
|
{
|
||||||
auto logger = getLogger("RemoteProxyConfigurationResolver");
|
auto logger = getLogger("RemoteProxyConfigurationResolver");
|
||||||
|
|
||||||
auto & [endpoint, proxy_protocol, proxy_port, cache_ttl_] = remote_server_configuration;
|
auto & [endpoint, proxy_protocol_string, proxy_port, cache_ttl] = remote_server_configuration;
|
||||||
|
|
||||||
LOG_DEBUG(logger, "Obtain proxy using resolver: {}", endpoint.toString());
|
|
||||||
|
|
||||||
std::lock_guard lock(cache_mutex);
|
std::lock_guard lock(cache_mutex);
|
||||||
|
|
||||||
@ -55,52 +78,18 @@ ProxyConfiguration RemoteProxyConfigurationResolver::resolve()
|
|||||||
.withSendTimeout(1)
|
.withSendTimeout(1)
|
||||||
.withReceiveTimeout(1);
|
.withReceiveTimeout(1);
|
||||||
|
|
||||||
try
|
const auto proxy_host = fetcher->fetch(endpoint, timeouts);
|
||||||
{
|
|
||||||
/// It should be just empty GET request.
|
|
||||||
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, endpoint.getPath(), Poco::Net::HTTPRequest::HTTP_1_1);
|
|
||||||
|
|
||||||
const auto & host = endpoint.getHost();
|
LOG_DEBUG(logger, "Use proxy: {}://{}:{}", proxy_protocol_string, proxy_host, proxy_port);
|
||||||
auto resolved_hosts = DNSResolver::instance().resolveHostAll(host);
|
|
||||||
|
|
||||||
HTTPSessionPtr session;
|
auto proxy_protocol = ProxyConfiguration::protocolFromString(proxy_protocol_string);
|
||||||
|
|
||||||
for (size_t i = 0; i < resolved_hosts.size(); ++i)
|
|
||||||
{
|
|
||||||
auto resolved_endpoint = endpoint;
|
|
||||||
resolved_endpoint.setHost(resolved_hosts[i].toString());
|
|
||||||
session = makeHTTPSession(HTTPConnectionGroupType::HTTP, resolved_endpoint, timeouts);
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
session->sendRequest(request);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
if (i + 1 == resolved_hosts.size())
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Poco::Net::HTTPResponse response;
|
|
||||||
auto & response_body_stream = session->receiveResponse(response);
|
|
||||||
|
|
||||||
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Proxy resolver returned not OK status: {}", response.getReason());
|
|
||||||
|
|
||||||
String proxy_host;
|
|
||||||
/// Read proxy host as string from response body.
|
|
||||||
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
|
|
||||||
|
|
||||||
LOG_DEBUG(logger, "Use proxy: {}://{}:{}", proxy_protocol, proxy_host, proxy_port);
|
|
||||||
|
|
||||||
bool use_tunneling_for_https_requests_over_http_proxy = useTunneling(
|
bool use_tunneling_for_https_requests_over_http_proxy = useTunneling(
|
||||||
request_protocol,
|
request_protocol,
|
||||||
cached_config.protocol,
|
proxy_protocol,
|
||||||
disable_tunneling_for_https_requests_over_http_proxy);
|
disable_tunneling_for_https_requests_over_http_proxy);
|
||||||
|
|
||||||
cached_config.protocol = ProxyConfiguration::protocolFromString(proxy_protocol);
|
cached_config.protocol = proxy_protocol;
|
||||||
cached_config.host = proxy_host;
|
cached_config.host = proxy_host;
|
||||||
cached_config.port = proxy_port;
|
cached_config.port = proxy_port;
|
||||||
cached_config.tunneling = use_tunneling_for_https_requests_over_http_proxy;
|
cached_config.tunneling = use_tunneling_for_https_requests_over_http_proxy;
|
||||||
@ -110,12 +99,6 @@ ProxyConfiguration RemoteProxyConfigurationResolver::resolve()
|
|||||||
|
|
||||||
return cached_config;
|
return cached_config;
|
||||||
}
|
}
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
tryLogCurrentException("RemoteProxyConfigurationResolver", "Failed to obtain proxy");
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void RemoteProxyConfigurationResolver::errorReport(const ProxyConfiguration & config)
|
void RemoteProxyConfigurationResolver::errorReport(const ProxyConfiguration & config)
|
||||||
{
|
{
|
||||||
@ -124,7 +107,7 @@ void RemoteProxyConfigurationResolver::errorReport(const ProxyConfiguration & co
|
|||||||
|
|
||||||
std::lock_guard lock(cache_mutex);
|
std::lock_guard lock(cache_mutex);
|
||||||
|
|
||||||
if (!cache_ttl.count() || !cache_valid)
|
if (!remote_server_configuration.cache_ttl_.count() || !cache_valid)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (std::tie(cached_config.protocol, cached_config.host, cached_config.port)
|
if (std::tie(cached_config.protocol, cached_config.host, cached_config.port)
|
||||||
|
@ -10,6 +10,19 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
struct ConnectionTimeouts;
|
||||||
|
|
||||||
|
struct RemoteProxyHostFetcher
|
||||||
|
{
|
||||||
|
virtual ~RemoteProxyHostFetcher() = default;
|
||||||
|
virtual std::string fetch(const Poco::URI & endpoint, const ConnectionTimeouts & timeouts) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct RemoteProxyHostFetcherImpl : public RemoteProxyHostFetcher
|
||||||
|
{
|
||||||
|
std::string fetch(const Poco::URI & endpoint, const ConnectionTimeouts & timeouts) override;
|
||||||
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Makes an HTTP GET request to the specified endpoint to obtain a proxy host.
|
* Makes an HTTP GET request to the specified endpoint to obtain a proxy host.
|
||||||
* */
|
* */
|
||||||
@ -22,13 +35,14 @@ public:
|
|||||||
Poco::URI endpoint;
|
Poco::URI endpoint;
|
||||||
String proxy_protocol;
|
String proxy_protocol;
|
||||||
unsigned proxy_port;
|
unsigned proxy_port;
|
||||||
unsigned cache_ttl_;
|
const std::chrono::seconds cache_ttl_;
|
||||||
};
|
};
|
||||||
|
|
||||||
RemoteProxyConfigurationResolver(
|
RemoteProxyConfigurationResolver(
|
||||||
const RemoteServerConfiguration & remote_server_configuration_,
|
const RemoteServerConfiguration & remote_server_configuration_,
|
||||||
Protocol request_protocol_,
|
Protocol request_protocol_,
|
||||||
bool disable_tunneling_for_https_requests_over_http_proxy_ = true);
|
std::shared_ptr<RemoteProxyHostFetcher> fetcher_,
|
||||||
|
bool disable_tunneling_for_https_requests_over_http_proxy_ = false);
|
||||||
|
|
||||||
ProxyConfiguration resolve() override;
|
ProxyConfiguration resolve() override;
|
||||||
|
|
||||||
@ -36,11 +50,11 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
RemoteServerConfiguration remote_server_configuration;
|
RemoteServerConfiguration remote_server_configuration;
|
||||||
|
std::shared_ptr<RemoteProxyHostFetcher> fetcher;
|
||||||
|
|
||||||
std::mutex cache_mutex;
|
std::mutex cache_mutex;
|
||||||
bool cache_valid = false;
|
bool cache_valid = false;
|
||||||
std::chrono::time_point<std::chrono::system_clock> cache_timestamp;
|
std::chrono::time_point<std::chrono::system_clock> cache_timestamp;
|
||||||
const std::chrono::seconds cache_ttl{0};
|
|
||||||
ProxyConfiguration cached_config;
|
ProxyConfiguration cached_config;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
172
src/Common/tests/gtest_proxy_remote_configuration_resolver.cpp
Normal file
172
src/Common/tests/gtest_proxy_remote_configuration_resolver.cpp
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <Common/RemoteProxyConfigurationResolver.h>
|
||||||
|
#include <Poco/URI.h>
|
||||||
|
#include <IO/ConnectionTimeouts.h>
|
||||||
|
#include <base/sleep.h>
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
|
struct RemoteProxyHostFetcherMock : public DB::RemoteProxyHostFetcher
|
||||||
|
{
|
||||||
|
explicit RemoteProxyHostFetcherMock(const std::string & return_mock_) : return_mock(return_mock_) {}
|
||||||
|
|
||||||
|
std::string fetch(const Poco::URI &, const DB::ConnectionTimeouts &) override
|
||||||
|
{
|
||||||
|
fetch_count++;
|
||||||
|
return return_mock;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string return_mock;
|
||||||
|
std::size_t fetch_count {0};
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
TEST(RemoteProxyConfigurationResolver, HTTPOverHTTP)
|
||||||
|
{
|
||||||
|
const char * proxy_server_mock = "proxy1";
|
||||||
|
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
|
||||||
|
{
|
||||||
|
Poco::URI("not_important"),
|
||||||
|
"http",
|
||||||
|
80,
|
||||||
|
std::chrono::seconds {10}
|
||||||
|
};
|
||||||
|
|
||||||
|
RemoteProxyConfigurationResolver resolver(
|
||||||
|
remote_server_configuration,
|
||||||
|
ProxyConfiguration::Protocol::HTTP,
|
||||||
|
std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock)
|
||||||
|
);
|
||||||
|
|
||||||
|
auto configuration = resolver.resolve();
|
||||||
|
|
||||||
|
ASSERT_EQ(configuration.host, proxy_server_mock);
|
||||||
|
ASSERT_EQ(configuration.port, 80);
|
||||||
|
ASSERT_EQ(configuration.protocol, ProxyConfiguration::Protocol::HTTP);
|
||||||
|
ASSERT_EQ(configuration.original_request_protocol, ProxyConfiguration::Protocol::HTTP);
|
||||||
|
ASSERT_EQ(configuration.tunneling, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RemoteProxyConfigurationResolver, HTTPSOverHTTPS)
|
||||||
|
{
|
||||||
|
const char * proxy_server_mock = "proxy1";
|
||||||
|
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
|
||||||
|
{
|
||||||
|
Poco::URI("not_important"),
|
||||||
|
"https",
|
||||||
|
443,
|
||||||
|
std::chrono::seconds {10}
|
||||||
|
};
|
||||||
|
|
||||||
|
RemoteProxyConfigurationResolver resolver(
|
||||||
|
remote_server_configuration,
|
||||||
|
ProxyConfiguration::Protocol::HTTPS,
|
||||||
|
std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock)
|
||||||
|
);
|
||||||
|
|
||||||
|
auto configuration = resolver.resolve();
|
||||||
|
|
||||||
|
ASSERT_EQ(configuration.host, proxy_server_mock);
|
||||||
|
ASSERT_EQ(configuration.port, 443);
|
||||||
|
ASSERT_EQ(configuration.protocol, ProxyConfiguration::Protocol::HTTPS);
|
||||||
|
ASSERT_EQ(configuration.original_request_protocol, ProxyConfiguration::Protocol::HTTPS);
|
||||||
|
// tunneling should not be used, https over https.
|
||||||
|
ASSERT_EQ(configuration.tunneling, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RemoteProxyConfigurationResolver, HTTPSOverHTTP)
|
||||||
|
{
|
||||||
|
const char * proxy_server_mock = "proxy1";
|
||||||
|
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
|
||||||
|
{
|
||||||
|
Poco::URI("not_important"),
|
||||||
|
"http",
|
||||||
|
80,
|
||||||
|
std::chrono::seconds {10}
|
||||||
|
};
|
||||||
|
|
||||||
|
RemoteProxyConfigurationResolver resolver(
|
||||||
|
remote_server_configuration,
|
||||||
|
ProxyConfiguration::Protocol::HTTPS,
|
||||||
|
std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock)
|
||||||
|
);
|
||||||
|
|
||||||
|
auto configuration = resolver.resolve();
|
||||||
|
|
||||||
|
ASSERT_EQ(configuration.host, proxy_server_mock);
|
||||||
|
ASSERT_EQ(configuration.port, 80);
|
||||||
|
ASSERT_EQ(configuration.protocol, ProxyConfiguration::Protocol::HTTP);
|
||||||
|
ASSERT_EQ(configuration.original_request_protocol, ProxyConfiguration::Protocol::HTTPS);
|
||||||
|
// tunneling should be used, https over http.
|
||||||
|
ASSERT_EQ(configuration.tunneling, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RemoteProxyConfigurationResolver, HTTPSOverHTTPNoTunneling)
|
||||||
|
{
|
||||||
|
const char * proxy_server_mock = "proxy1";
|
||||||
|
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
|
||||||
|
{
|
||||||
|
Poco::URI("not_important"),
|
||||||
|
"http",
|
||||||
|
80,
|
||||||
|
std::chrono::seconds {10}
|
||||||
|
};
|
||||||
|
|
||||||
|
RemoteProxyConfigurationResolver resolver(
|
||||||
|
remote_server_configuration,
|
||||||
|
ProxyConfiguration::Protocol::HTTPS,
|
||||||
|
std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock),
|
||||||
|
true /* disable_tunneling_for_https_requests_over_http_proxy_ */
|
||||||
|
);
|
||||||
|
|
||||||
|
auto configuration = resolver.resolve();
|
||||||
|
|
||||||
|
ASSERT_EQ(configuration.host, proxy_server_mock);
|
||||||
|
ASSERT_EQ(configuration.port, 80);
|
||||||
|
ASSERT_EQ(configuration.protocol, ProxyConfiguration::Protocol::HTTP);
|
||||||
|
ASSERT_EQ(configuration.original_request_protocol, ProxyConfiguration::Protocol::HTTPS);
|
||||||
|
// tunneling should be used, https over http.
|
||||||
|
ASSERT_EQ(configuration.tunneling, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RemoteProxyConfigurationResolver, SimpleCacheTest)
|
||||||
|
{
|
||||||
|
const char * proxy_server_mock = "proxy1";
|
||||||
|
auto cache_ttl = 5u;
|
||||||
|
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
|
||||||
|
{
|
||||||
|
Poco::URI("not_important"),
|
||||||
|
"http",
|
||||||
|
80,
|
||||||
|
std::chrono::seconds {cache_ttl}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto fetcher_mock = std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock);
|
||||||
|
|
||||||
|
RemoteProxyConfigurationResolver resolver(
|
||||||
|
remote_server_configuration,
|
||||||
|
ProxyConfiguration::Protocol::HTTP,
|
||||||
|
fetcher_mock
|
||||||
|
);
|
||||||
|
|
||||||
|
resolver.resolve();
|
||||||
|
resolver.resolve();
|
||||||
|
resolver.resolve();
|
||||||
|
|
||||||
|
ASSERT_EQ(fetcher_mock->fetch_count, 1u);
|
||||||
|
|
||||||
|
sleepForSeconds(cache_ttl * 2);
|
||||||
|
|
||||||
|
resolver.resolve();
|
||||||
|
|
||||||
|
ASSERT_EQ(fetcher_mock->fetch_count, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -48,7 +48,7 @@ HTTPSessionPtr makeHTTPSession(
|
|||||||
HTTPConnectionGroupType group,
|
HTTPConnectionGroupType group,
|
||||||
const Poco::URI & uri,
|
const Poco::URI & uri,
|
||||||
const ConnectionTimeouts & timeouts,
|
const ConnectionTimeouts & timeouts,
|
||||||
ProxyConfiguration proxy_configuration)
|
const ProxyConfiguration & proxy_configuration)
|
||||||
{
|
{
|
||||||
auto connection_pool = HTTPConnectionPools::instance().getPool(group, uri, proxy_configuration);
|
auto connection_pool = HTTPConnectionPools::instance().getPool(group, uri, proxy_configuration);
|
||||||
return connection_pool->getConnection(timeouts);
|
return connection_pool->getConnection(timeouts);
|
||||||
|
@ -61,7 +61,7 @@ HTTPSessionPtr makeHTTPSession(
|
|||||||
HTTPConnectionGroupType group,
|
HTTPConnectionGroupType group,
|
||||||
const Poco::URI & uri,
|
const Poco::URI & uri,
|
||||||
const ConnectionTimeouts & timeouts,
|
const ConnectionTimeouts & timeouts,
|
||||||
ProxyConfiguration proxy_config = {}
|
const ProxyConfiguration & proxy_config = {}
|
||||||
);
|
);
|
||||||
|
|
||||||
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);
|
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);
|
||||||
|
@ -305,8 +305,7 @@ void PocoHTTPClient::makeRequestInternal(
|
|||||||
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
|
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
|
||||||
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
|
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
|
||||||
{
|
{
|
||||||
const auto request_configuration = per_request_configuration();
|
makeRequestInternalImpl(request, response, readLimiter, writeLimiter);
|
||||||
makeRequestInternalImpl(request, request_configuration, response, readLimiter, writeLimiter);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String getMethod(const Aws::Http::HttpRequest & request)
|
String getMethod(const Aws::Http::HttpRequest & request)
|
||||||
@ -330,7 +329,6 @@ String getMethod(const Aws::Http::HttpRequest & request)
|
|||||||
|
|
||||||
void PocoHTTPClient::makeRequestInternalImpl(
|
void PocoHTTPClient::makeRequestInternalImpl(
|
||||||
Aws::Http::HttpRequest & request,
|
Aws::Http::HttpRequest & request,
|
||||||
const DB::ProxyConfiguration & proxy_configuration,
|
|
||||||
std::shared_ptr<PocoHTTPResponse> & response,
|
std::shared_ptr<PocoHTTPResponse> & response,
|
||||||
Aws::Utils::RateLimits::RateLimiterInterface *,
|
Aws::Utils::RateLimits::RateLimiterInterface *,
|
||||||
Aws::Utils::RateLimits::RateLimiterInterface *) const
|
Aws::Utils::RateLimits::RateLimiterInterface *) const
|
||||||
@ -383,6 +381,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
const auto proxy_configuration = per_request_configuration();
|
||||||
for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt)
|
for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt)
|
||||||
{
|
{
|
||||||
Poco::URI target_uri(uri);
|
Poco::URI target_uri(uri);
|
||||||
|
@ -156,7 +156,6 @@ private:
|
|||||||
|
|
||||||
void makeRequestInternalImpl(
|
void makeRequestInternalImpl(
|
||||||
Aws::Http::HttpRequest & request,
|
Aws::Http::HttpRequest & request,
|
||||||
const DB::ProxyConfiguration & proxy_configuration,
|
|
||||||
std::shared_ptr<PocoHTTPResponse> & response,
|
std::shared_ptr<PocoHTTPResponse> & response,
|
||||||
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
|
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
|
||||||
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
|
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
|
||||||
|
@ -457,7 +457,7 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
|
|||||||
|
|
||||||
const auto settings = context_->getSettings();
|
const auto settings = context_->getSettings();
|
||||||
|
|
||||||
auto proxy_config = getProxyConfiguration(http_method);
|
auto proxy_config = getProxyConfiguration(request_uri.getScheme());
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -543,10 +543,11 @@ StorageURLSink::StorageURLSink(
|
|||||||
std::string content_type = FormatFactory::instance().getContentType(format, context, format_settings);
|
std::string content_type = FormatFactory::instance().getContentType(format, context, format_settings);
|
||||||
std::string content_encoding = toContentEncodingName(compression_method);
|
std::string content_encoding = toContentEncodingName(compression_method);
|
||||||
|
|
||||||
auto proxy_config = getProxyConfiguration(http_method);
|
auto poco_uri = Poco::URI(uri);
|
||||||
|
auto proxy_config = getProxyConfiguration(poco_uri.getScheme());
|
||||||
|
|
||||||
auto write_buffer = std::make_unique<WriteBufferFromHTTP>(
|
auto write_buffer = std::make_unique<WriteBufferFromHTTP>(
|
||||||
HTTPConnectionGroupType::STORAGE, Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts, DBMS_DEFAULT_BUFFER_SIZE, proxy_config
|
HTTPConnectionGroupType::STORAGE, poco_uri, http_method, content_type, content_encoding, headers, timeouts, DBMS_DEFAULT_BUFFER_SIZE, proxy_config
|
||||||
);
|
);
|
||||||
|
|
||||||
const auto & settings = context->getSettingsRef();
|
const auto & settings = context->getSettingsRef();
|
||||||
@ -1327,6 +1328,7 @@ std::optional<time_t> IStorageURLBase::tryGetLastModificationTime(
|
|||||||
.withBufSize(settings.max_read_buffer_size)
|
.withBufSize(settings.max_read_buffer_size)
|
||||||
.withRedirects(settings.max_http_get_redirects)
|
.withRedirects(settings.max_http_get_redirects)
|
||||||
.withHeaders(headers)
|
.withHeaders(headers)
|
||||||
|
.withProxy(proxy_config)
|
||||||
.create(credentials);
|
.create(credentials);
|
||||||
|
|
||||||
return buf->tryGetLastModificationTime();
|
return buf->tryGetLastModificationTime();
|
||||||
|
@ -2,21 +2,35 @@ import os
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
ALL_HTTP_METHODS = {"POST", "PUT", "GET", "HEAD", "CONNECT"}
|
||||||
|
|
||||||
|
|
||||||
def check_proxy_logs(
|
def check_proxy_logs(
|
||||||
cluster, proxy_instance, protocol, bucket, http_methods={"POST", "PUT", "GET"}
|
cluster, proxy_instances, protocol, bucket, requested_http_methods
|
||||||
):
|
):
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
logs = cluster.get_container_logs(proxy_instance)
|
|
||||||
# Check with retry that all possible interactions with Minio are present
|
# Check with retry that all possible interactions with Minio are present
|
||||||
for http_method in http_methods:
|
for http_method in ALL_HTTP_METHODS:
|
||||||
|
for proxy_instance in proxy_instances:
|
||||||
|
logs = cluster.get_container_logs(proxy_instance)
|
||||||
if (
|
if (
|
||||||
logs.find(http_method + f" {protocol}://minio1:9001/root/data/{bucket}")
|
logs.find(
|
||||||
|
http_method + f" {protocol}://minio1:9001/root/data/{bucket}"
|
||||||
|
)
|
||||||
>= 0
|
>= 0
|
||||||
):
|
):
|
||||||
return
|
if http_method not in requested_http_methods:
|
||||||
time.sleep(1)
|
assert (
|
||||||
|
False
|
||||||
|
), f"Found http method {http_method} for bucket {bucket} that should not be found in {proxy_instance} logs"
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
|
if http_method in requested_http_methods:
|
||||||
|
assert (
|
||||||
|
False
|
||||||
|
), f"{http_method} method not found in logs of {proxy_instance} for bucket {bucket}"
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
def wait_resolver(cluster):
|
def wait_resolver(cluster):
|
||||||
@ -33,7 +47,7 @@ def wait_resolver(cluster):
|
|||||||
if response == "proxy1" or response == "proxy2":
|
if response == "proxy1" or response == "proxy2":
|
||||||
return
|
return
|
||||||
time.sleep(i)
|
time.sleep(i)
|
||||||
else:
|
|
||||||
assert False, "Resolver is not up"
|
assert False, "Resolver is not up"
|
||||||
|
|
||||||
|
|
||||||
@ -80,9 +94,33 @@ def perform_simple_queries(node, minio_endpoint):
|
|||||||
|
|
||||||
def simple_test(cluster, proxies, protocol, bucket):
|
def simple_test(cluster, proxies, protocol, bucket):
|
||||||
minio_endpoint = build_s3_endpoint(protocol, bucket)
|
minio_endpoint = build_s3_endpoint(protocol, bucket)
|
||||||
node = cluster.instances[f"{bucket}"]
|
node = cluster.instances[bucket]
|
||||||
|
|
||||||
perform_simple_queries(node, minio_endpoint)
|
perform_simple_queries(node, minio_endpoint)
|
||||||
|
|
||||||
for proxy in proxies:
|
check_proxy_logs(cluster, proxies, protocol, bucket, ["PUT", "GET", "HEAD"])
|
||||||
check_proxy_logs(cluster, proxy, protocol, bucket)
|
|
||||||
|
|
||||||
|
def simple_storage_test(cluster, node, proxies, policy):
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE s3_test (
|
||||||
|
id Int64,
|
||||||
|
data String
|
||||||
|
) ENGINE=MergeTree()
|
||||||
|
ORDER BY id
|
||||||
|
SETTINGS storage_policy='{}'
|
||||||
|
""".format(
|
||||||
|
policy
|
||||||
|
)
|
||||||
|
)
|
||||||
|
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
|
||||||
|
assert (
|
||||||
|
node.query("SELECT * FROM s3_test order by id FORMAT Values")
|
||||||
|
== "(0,'data'),(1,'data')"
|
||||||
|
)
|
||||||
|
|
||||||
|
node.query("DROP TABLE IF EXISTS s3_test SYNC")
|
||||||
|
|
||||||
|
# not checking for POST because it is in a different format
|
||||||
|
check_proxy_logs(cluster, proxies, "http", policy, ["PUT", "GET"])
|
||||||
|
@ -5,7 +5,10 @@ import bottle
|
|||||||
|
|
||||||
@bottle.route("/hostname")
|
@bottle.route("/hostname")
|
||||||
def index():
|
def index():
|
||||||
|
if random.randrange(2) == 0:
|
||||||
return "proxy1"
|
return "proxy1"
|
||||||
|
else:
|
||||||
|
return "proxy2"
|
||||||
|
|
||||||
|
|
||||||
bottle.run(host="0.0.0.0", port=8080)
|
bottle.run(host="0.0.0.0", port=8080)
|
||||||
|
@ -56,7 +56,7 @@ def test_s3_with_https_proxy_list(cluster):
|
|||||||
|
|
||||||
|
|
||||||
def test_s3_with_https_remote_proxy(cluster):
|
def test_s3_with_https_remote_proxy(cluster):
|
||||||
proxy_util.simple_test(cluster, ["proxy1"], "https", "remote_proxy_node")
|
proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "https", "remote_proxy_node")
|
||||||
|
|
||||||
|
|
||||||
def test_s3_with_https_env_proxy(cluster):
|
def test_s3_with_https_env_proxy(cluster):
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
<proxy>
|
<proxy>
|
||||||
<http>
|
<http>
|
||||||
<uri>http://proxy1</uri>
|
<uri>http://proxy1</uri>
|
||||||
<uri>http://proxy2</uri>
|
|
||||||
</http>
|
</http>
|
||||||
</proxy>
|
</proxy>
|
||||||
</clickhouse>
|
</clickhouse>
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
<disks>
|
<disks>
|
||||||
<s3>
|
<s3>
|
||||||
<type>s3</type>
|
<type>s3</type>
|
||||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
<endpoint>http://minio1:9001/root/data/s3</endpoint>
|
||||||
<access_key_id>minio</access_key_id>
|
<access_key_id>minio</access_key_id>
|
||||||
<secret_access_key>minio123</secret_access_key>
|
<secret_access_key>minio123</secret_access_key>
|
||||||
</s3>
|
</s3>
|
||||||
|
@ -3,6 +3,7 @@ import time
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from helpers.cluster import ClickHouseCluster
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
import helpers.s3_url_proxy_tests_util as proxy_util
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
@ -26,41 +27,8 @@ def cluster():
|
|||||||
cluster.shutdown()
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET"}):
|
|
||||||
for i in range(10):
|
|
||||||
logs = cluster.get_container_logs(proxy_instance)
|
|
||||||
# Check with retry that all possible interactions with Minio are present
|
|
||||||
for http_method in http_methods:
|
|
||||||
if logs.find(http_method + " http://minio1") >= 0:
|
|
||||||
return
|
|
||||||
time.sleep(1)
|
|
||||||
else:
|
|
||||||
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("policy", ["s3"])
|
@pytest.mark.parametrize("policy", ["s3"])
|
||||||
def test_s3_with_proxy_list(cluster, policy):
|
def test_s3_with_proxy_list(cluster, policy):
|
||||||
node = cluster.instances["node"]
|
proxy_util.simple_storage_test(
|
||||||
|
cluster, cluster.instances["node"], ["proxy1"], policy
|
||||||
node.query(
|
|
||||||
"""
|
|
||||||
CREATE TABLE s3_test (
|
|
||||||
id Int64,
|
|
||||||
data String
|
|
||||||
) ENGINE=MergeTree()
|
|
||||||
ORDER BY id
|
|
||||||
SETTINGS storage_policy='{}'
|
|
||||||
""".format(
|
|
||||||
policy
|
|
||||||
)
|
)
|
||||||
)
|
|
||||||
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
|
|
||||||
assert (
|
|
||||||
node.query("SELECT * FROM s3_test order by id FORMAT Values")
|
|
||||||
== "(0,'data'),(1,'data')"
|
|
||||||
)
|
|
||||||
|
|
||||||
node.query("DROP TABLE IF EXISTS s3_test SYNC")
|
|
||||||
|
|
||||||
for proxy in ["proxy1", "proxy2"]:
|
|
||||||
check_proxy_logs(cluster, proxy, ["PUT", "GET"])
|
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
<disks>
|
<disks>
|
||||||
<s3>
|
<s3>
|
||||||
<type>s3</type>
|
<type>s3</type>
|
||||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
<endpoint>http://minio1:9001/root/data/s3</endpoint>
|
||||||
<access_key_id>minio</access_key_id>
|
<access_key_id>minio</access_key_id>
|
||||||
<secret_access_key>minio123</secret_access_key>
|
<secret_access_key>minio123</secret_access_key>
|
||||||
<proxy>
|
<proxy>
|
||||||
@ -13,9 +13,10 @@
|
|||||||
</s3>
|
</s3>
|
||||||
<s3_with_resolver>
|
<s3_with_resolver>
|
||||||
<type>s3</type>
|
<type>s3</type>
|
||||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
<endpoint>http://minio1:9001/root/data/s3_with_resolver</endpoint>
|
||||||
<access_key_id>minio</access_key_id>
|
<access_key_id>minio</access_key_id>
|
||||||
<secret_access_key>minio123</secret_access_key>
|
<secret_access_key>minio123</secret_access_key>
|
||||||
|
<skip_access_check>true</skip_access_check>
|
||||||
<proxy>
|
<proxy>
|
||||||
<!--
|
<!--
|
||||||
At each interaction with S3 resolver sends empty GET request to specified endpoint URL to obtain proxy host.
|
At each interaction with S3 resolver sends empty GET request to specified endpoint URL to obtain proxy host.
|
||||||
|
@ -26,41 +26,8 @@ def cluster():
|
|||||||
cluster.shutdown()
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET"}):
|
|
||||||
for i in range(10):
|
|
||||||
logs = cluster.get_container_logs(proxy_instance)
|
|
||||||
# Check with retry that all possible interactions with Minio are present
|
|
||||||
for http_method in http_methods:
|
|
||||||
if logs.find(http_method + " http://minio1") >= 0:
|
|
||||||
return
|
|
||||||
time.sleep(1)
|
|
||||||
else:
|
|
||||||
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("policy", ["s3", "s3_with_resolver"])
|
@pytest.mark.parametrize("policy", ["s3", "s3_with_resolver"])
|
||||||
def test_s3_with_proxy_list(cluster, policy):
|
def test_s3_with_proxy_list(cluster, policy):
|
||||||
node = cluster.instances["node"]
|
proxy_util.simple_storage_test(
|
||||||
|
cluster, cluster.instances["node"], ["proxy1", "proxy2"], policy
|
||||||
node.query(
|
|
||||||
"""
|
|
||||||
CREATE TABLE s3_test (
|
|
||||||
id Int64,
|
|
||||||
data String
|
|
||||||
) ENGINE=MergeTree()
|
|
||||||
ORDER BY id
|
|
||||||
SETTINGS storage_policy='{}'
|
|
||||||
""".format(
|
|
||||||
policy
|
|
||||||
)
|
)
|
||||||
)
|
|
||||||
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
|
|
||||||
assert (
|
|
||||||
node.query("SELECT * FROM s3_test order by id FORMAT Values")
|
|
||||||
== "(0,'data'),(1,'data')"
|
|
||||||
)
|
|
||||||
|
|
||||||
node.query("DROP TABLE IF EXISTS s3_test SYNC")
|
|
||||||
|
|
||||||
for proxy in ["proxy1", "proxy2"]:
|
|
||||||
check_proxy_logs(cluster, proxy, ["PUT", "GET"])
|
|
||||||
|
@ -5,7 +5,10 @@ import bottle
|
|||||||
|
|
||||||
@bottle.route("/hostname")
|
@bottle.route("/hostname")
|
||||||
def index():
|
def index():
|
||||||
|
if random.randrange(2) == 0:
|
||||||
return "proxy1"
|
return "proxy1"
|
||||||
|
else:
|
||||||
|
return "proxy2"
|
||||||
|
|
||||||
|
|
||||||
bottle.run(host="0.0.0.0", port=8080)
|
bottle.run(host="0.0.0.0", port=8080)
|
||||||
|
@ -53,7 +53,7 @@ def test_s3_with_http_proxy_list(cluster):
|
|||||||
|
|
||||||
|
|
||||||
def test_s3_with_http_remote_proxy(cluster):
|
def test_s3_with_http_remote_proxy(cluster):
|
||||||
proxy_util.simple_test(cluster, ["proxy1"], "http", "remote_proxy_node")
|
proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "http", "remote_proxy_node")
|
||||||
|
|
||||||
|
|
||||||
def test_s3_with_http_env_proxy(cluster):
|
def test_s3_with_http_env_proxy(cluster):
|
||||||
|
@ -1,9 +1,5 @@
|
|||||||
<clickhouse>
|
<clickhouse>
|
||||||
<proxy>
|
<proxy>
|
||||||
<http>
|
|
||||||
<uri>http://proxy1</uri>
|
|
||||||
<uri>http://proxy2</uri>
|
|
||||||
</http>
|
|
||||||
<https>
|
<https>
|
||||||
<uri>https://proxy1</uri>
|
<uri>https://proxy1</uri>
|
||||||
<uri>https://proxy2</uri>
|
<uri>https://proxy2</uri>
|
||||||
|
@ -5,7 +5,10 @@ import bottle
|
|||||||
|
|
||||||
@bottle.route("/hostname")
|
@bottle.route("/hostname")
|
||||||
def index():
|
def index():
|
||||||
|
if random.randrange(2) == 0:
|
||||||
return "proxy1"
|
return "proxy1"
|
||||||
|
else:
|
||||||
|
return "proxy2"
|
||||||
|
|
||||||
|
|
||||||
bottle.run(host="0.0.0.0", port=8080)
|
bottle.run(host="0.0.0.0", port=8080)
|
||||||
|
@ -61,7 +61,7 @@ def test_s3_with_https_proxy_list(cluster):
|
|||||||
|
|
||||||
|
|
||||||
def test_s3_with_https_remote_proxy(cluster):
|
def test_s3_with_https_remote_proxy(cluster):
|
||||||
proxy_util.simple_test(cluster, ["proxy1"], "https", "remote_proxy_node")
|
proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "https", "remote_proxy_node")
|
||||||
|
|
||||||
|
|
||||||
def test_s3_with_https_env_proxy(cluster):
|
def test_s3_with_https_env_proxy(cluster):
|
||||||
|
Loading…
Reference in New Issue
Block a user