From 2bade7db083e0a390d51360621f88b7b6b2ee12e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 24 Aug 2023 10:07:26 -0300 Subject: [PATCH] Add global proxy setting (#51749) * initial impl * fix env ut * move ut directory * make sure no null proxy resolver is returned by ProxyConfigurationResolverProvider * minor adjustment * add a few tests, still incomplete * add proxy support for url table function * use proxy for select from url as well * remove optional from return type, just returns empty config * fix style * style * black * ohg boy * rm in progress file * god pls don't let me kill anyone * ... * add use_aws guards * remove hard coded s3 proxy resolver * add concurrency-mt-unsafe * aa * black * add logging back * revert change * imrpove code a bit * helper functions and separate tests * for some reason, this env test is not working.. * formatting * :) * clangtidy * lint * revert some stupid things * small test adjusmtments * simplify tests * rename test * remove extra line * freaking style change * simplify a bit * fix segfault & remove an extra call * tightly couple proxy provider with context.. * remove useless include * rename config prefix parameter * simplify provider a bit * organize provider a bit * add a few comments * comment out proxy env tests * fix nullptr in unit tests * make sure old storage proxy config is properly covered without global context instance * move a few functions from class to anonymous namespace * fix no fallback for specific storage conf * change API to accept http method instead of bool * implement http/https distinction in listresolver, any still not implemented * implement http/https distinction in remote resolver * progress on code, improve tests and add url function working test * use protcol instead of method for http and https * small fix * few more adjustments * fix style * black * move enum to proxyconfiguration * wip * fix build * fix ut * delete atomicroundrobin class * remove stale include * add some tests.. need to spend some more time on the design.. * change design a bit * progress * use existing context for tests * rename aux function and fix ut * .. * rename test * try to simplify tests a bit * simplify tests a bit more * attempt to fix tests, accept more than one remote resolver * use proper log id * try waiting for resolver * proper wait logic * black * empty * address a few comments * refactor tests * remove old tests * baclk * use RAII to set/unset env * black * clang tidy * fix env proxy not respecting any * use log trace * fix wrong logic in getRemoteREsolver * fix wrong logic in getRemoteREsolver * fix test * remove unwanted code * remove ClientConfigurationperRequest and auxilary classes * remove unwanted code * remove adapter test * few adjustments and add test for s3 storage conf with new proxy settings * black * use chassert for context * Add getenv comment --- programs/keeper/CMakeLists.txt | 2 - src/Backups/BackupIO_S3.cpp | 3 +- .../EnvironmentProxyConfigurationResolver.cpp | 74 +++++++ .../EnvironmentProxyConfigurationResolver.h | 23 ++ src/Common/ProxyConfiguration.h | 51 +++++ src/Common/ProxyConfigurationResolver.h | 17 ++ .../ProxyConfigurationResolverProvider.cpp | 208 ++++++++++++++++++ .../ProxyConfigurationResolverProvider.h | 40 ++++ src/Common/ProxyListConfigurationResolver.cpp | 31 +++ src/Common/ProxyListConfigurationResolver.h | 31 +++ .../RemoteProxyConfigurationResolver.cpp} | 61 ++--- src/Common/RemoteProxyConfigurationResolver.h | 46 ++++ src/Common/tests/gtest_helper_functions.h | 22 ++ ..._proxy_configuration_resolver_provider.cpp | 122 ++++++++++ .../gtest_proxy_environment_configuration.cpp | 96 ++++++++ ...test_proxy_list_configuration_resolver.cpp | 26 +++ src/Coordination/KeeperSnapshotManagerS3.cpp | 3 +- .../ObjectStorages/S3/ProxyConfiguration.h | 25 --- .../S3/ProxyListConfiguration.cpp | 32 --- .../S3/ProxyListConfiguration.h | 32 --- .../S3/ProxyResolverConfiguration.h | 42 ---- src/Disks/ObjectStorages/S3/diskSettings.cpp | 83 +------ src/IO/HTTPCommon.cpp | 18 +- src/IO/HTTPCommon.h | 6 +- src/IO/ReadWriteBufferFromHTTP.cpp | 18 +- src/IO/ReadWriteBufferFromHTTP.h | 11 +- src/IO/S3/Client.cpp | 20 +- src/IO/S3/Client.h | 3 +- src/IO/S3/Credentials.cpp | 4 +- src/IO/S3/PocoHTTPClient.cpp | 24 +- src/IO/S3/PocoHTTPClient.h | 22 +- src/IO/S3/tests/gtest_aws_s3_client.cpp | 13 +- src/IO/WriteBufferFromHTTP.cpp | 5 +- src/IO/WriteBufferFromHTTP.h | 3 +- src/Storages/StorageS3.cpp | 3 +- src/Storages/StorageURL.cpp | 48 +++- .../helpers/s3_url_proxy_tests_util.py | 88 ++++++++ .../__init__.py | 0 .../configs/config.d/proxy_list.xml | 8 + .../configs/config.d/storage_conf.xml | 21 ++ .../configs/config.d/users.xml | 0 .../configs/config.xml | 7 + .../proxy-resolver/resolver.py | 0 .../test_s3_storage_conf_new_proxy/test.py | 66 ++++++ .../test_s3_storage_conf_proxy/__init__.py | 0 .../configs/config.d/storage_conf.xml | 0 .../configs/config.d/users.xml | 5 + .../configs/config.xml | 7 + .../proxy-resolver/resolver.py | 14 ++ .../test.py | 18 +- .../__init__.py | 0 .../configs/config.d/proxy_list.xml | 8 + .../configs/config.d/proxy_remote.xml | 15 ++ .../proxy-resolver/resolver.py | 11 + .../test.py | 59 +++++ .../__init__.py | 0 .../configs/config.d/proxy_list.xml | 12 + .../configs/config.d/proxy_remote.xml | 22 ++ .../configs/config.d/ssl.xml | 11 + .../configs/config.d/users.xml | 5 + .../configs/config.xml | 0 .../minio_certs/CAs/public.crt | 18 ++ .../minio_certs/private.key | 28 +++ .../minio_certs/public.crt | 19 ++ .../proxy-resolver/resolver.py | 11 + .../test.py | 67 ++++++ .../test_storage_url_with_proxy/__init__.py | 0 .../configs/config.d/proxy_list.xml | 7 + .../test_storage_url_with_proxy/test.py | 84 +++++++ 69 files changed, 1576 insertions(+), 303 deletions(-) create mode 100644 src/Common/EnvironmentProxyConfigurationResolver.cpp create mode 100644 src/Common/EnvironmentProxyConfigurationResolver.h create mode 100644 src/Common/ProxyConfiguration.h create mode 100644 src/Common/ProxyConfigurationResolver.h create mode 100644 src/Common/ProxyConfigurationResolverProvider.cpp create mode 100644 src/Common/ProxyConfigurationResolverProvider.h create mode 100644 src/Common/ProxyListConfigurationResolver.cpp create mode 100644 src/Common/ProxyListConfigurationResolver.h rename src/{Disks/ObjectStorages/S3/ProxyResolverConfiguration.cpp => Common/RemoteProxyConfigurationResolver.cpp} (62%) create mode 100644 src/Common/RemoteProxyConfigurationResolver.h create mode 100644 src/Common/tests/gtest_proxy_configuration_resolver_provider.cpp create mode 100644 src/Common/tests/gtest_proxy_environment_configuration.cpp create mode 100644 src/Common/tests/gtest_proxy_list_configuration_resolver.cpp delete mode 100644 src/Disks/ObjectStorages/S3/ProxyConfiguration.h delete mode 100644 src/Disks/ObjectStorages/S3/ProxyListConfiguration.cpp delete mode 100644 src/Disks/ObjectStorages/S3/ProxyListConfiguration.h delete mode 100644 src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.h create mode 100644 tests/integration/helpers/s3_url_proxy_tests_util.py rename tests/integration/{test_s3_with_proxy => test_s3_storage_conf_new_proxy}/__init__.py (100%) create mode 100644 tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/proxy_list.xml create mode 100644 tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/storage_conf.xml rename tests/integration/{test_s3_with_proxy => test_s3_storage_conf_new_proxy}/configs/config.d/users.xml (100%) create mode 100644 tests/integration/test_s3_storage_conf_new_proxy/configs/config.xml rename tests/integration/{test_s3_with_proxy => test_s3_storage_conf_new_proxy}/proxy-resolver/resolver.py (100%) create mode 100644 tests/integration/test_s3_storage_conf_new_proxy/test.py create mode 100644 tests/integration/test_s3_storage_conf_proxy/__init__.py rename tests/integration/{test_s3_with_proxy => test_s3_storage_conf_proxy}/configs/config.d/storage_conf.xml (100%) create mode 100644 tests/integration/test_s3_storage_conf_proxy/configs/config.d/users.xml create mode 100644 tests/integration/test_s3_storage_conf_proxy/configs/config.xml create mode 100644 tests/integration/test_s3_storage_conf_proxy/proxy-resolver/resolver.py rename tests/integration/{test_s3_with_proxy => test_s3_storage_conf_proxy}/test.py (76%) create mode 100644 tests/integration/test_s3_table_function_with_http_proxy/__init__.py create mode 100644 tests/integration/test_s3_table_function_with_http_proxy/configs/config.d/proxy_list.xml create mode 100644 tests/integration/test_s3_table_function_with_http_proxy/configs/config.d/proxy_remote.xml create mode 100644 tests/integration/test_s3_table_function_with_http_proxy/proxy-resolver/resolver.py create mode 100644 tests/integration/test_s3_table_function_with_http_proxy/test.py create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/__init__.py create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/proxy_list.xml create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/proxy_remote.xml create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/ssl.xml create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/users.xml rename tests/integration/{test_s3_with_proxy => test_s3_table_function_with_https_proxy}/configs/config.xml (100%) create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/minio_certs/CAs/public.crt create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/minio_certs/private.key create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/minio_certs/public.crt create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/proxy-resolver/resolver.py create mode 100644 tests/integration/test_s3_table_function_with_https_proxy/test.py create mode 100644 tests/integration/test_storage_url_with_proxy/__init__.py create mode 100644 tests/integration/test_storage_url_with_proxy/configs/config.d/proxy_list.xml create mode 100644 tests/integration/test_storage_url_with_proxy/test.py diff --git a/programs/keeper/CMakeLists.txt b/programs/keeper/CMakeLists.txt index a43a312ba54..981ed2432f1 100644 --- a/programs/keeper/CMakeLists.txt +++ b/programs/keeper/CMakeLists.txt @@ -133,8 +133,6 @@ if (BUILD_STANDALONE_KEEPER) ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/S3Capabilities.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/diskSettings.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/ProxyListConfiguration.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/createReadBufferFromFileBase.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../src/Disks/IO/ReadBufferFromRemoteFSGather.cpp diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index 7926d0b2564..2482679fa4e 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -50,7 +50,8 @@ namespace context->getRemoteHostFilter(), static_cast(context->getGlobalContext()->getSettingsRef().s3_max_redirects), context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging, - /* for_disk_s3 = */ false, settings.request_settings.get_request_throttler, settings.request_settings.put_request_throttler); + /* for_disk_s3 = */ false, settings.request_settings.get_request_throttler, settings.request_settings.put_request_throttler, + s3_uri.uri.getScheme()); client_configuration.endpointOverride = s3_uri.endpoint; client_configuration.maxConnections = static_cast(context->getSettingsRef().s3_max_connections); diff --git a/src/Common/EnvironmentProxyConfigurationResolver.cpp b/src/Common/EnvironmentProxyConfigurationResolver.cpp new file mode 100644 index 00000000000..184e2ac5ca5 --- /dev/null +++ b/src/Common/EnvironmentProxyConfigurationResolver.cpp @@ -0,0 +1,74 @@ +#include "EnvironmentProxyConfigurationResolver.h" + +#include +#include + +namespace DB +{ + +/* + * Usually environment variables are upper-case, but it seems like proxy related variables are an exception. + * See https://unix.stackexchange.com/questions/212894/whats-the-right-format-for-the-http-proxy-environment-variable-caps-or-no-ca/212972#212972 + * */ +static constexpr auto PROXY_HTTP_ENVIRONMENT_VARIABLE = "http_proxy"; +static constexpr auto PROXY_HTTPS_ENVIRONMENT_VARIABLE = "https_proxy"; + +EnvironmentProxyConfigurationResolver::EnvironmentProxyConfigurationResolver(Protocol protocol_) + : protocol(protocol_) +{} + +namespace +{ + const char * getProxyHost(DB::ProxyConfiguration::Protocol protocol) + { + /* + * getenv is safe to use here because ClickHouse code does not make any call to `setenv` or `putenv` + * aside from tests and a very early call during startup: https://github.com/ClickHouse/ClickHouse/blob/master/src/Daemon/BaseDaemon.cpp#L791 + * */ + + if (protocol == DB::ProxyConfiguration::Protocol::HTTP) + { + return std::getenv(PROXY_HTTP_ENVIRONMENT_VARIABLE); // NOLINT(concurrency-mt-unsafe) + } + else if (protocol == DB::ProxyConfiguration::Protocol::HTTPS) + { + return std::getenv(PROXY_HTTPS_ENVIRONMENT_VARIABLE); // NOLINT(concurrency-mt-unsafe) + } + else + { + if (const char * http_proxy_host = std::getenv(PROXY_HTTP_ENVIRONMENT_VARIABLE)) // NOLINT(concurrency-mt-unsafe) + { + return http_proxy_host; + } + else + { + return std::getenv(PROXY_HTTPS_ENVIRONMENT_VARIABLE); // NOLINT(concurrency-mt-unsafe) + } + } + } +} + +ProxyConfiguration EnvironmentProxyConfigurationResolver::resolve() +{ + const auto * proxy_host = getProxyHost(protocol); + + if (!proxy_host) + { + return {}; + } + + auto uri = Poco::URI(proxy_host); + auto host = uri.getHost(); + auto scheme = uri.getScheme(); + auto port = uri.getPort(); + + LOG_TRACE(&Poco::Logger::get("EnvironmentProxyConfigurationResolver"), "Use proxy from environment: {}://{}:{}", scheme, host, port); + + return ProxyConfiguration { + host, + ProxyConfiguration::protocolFromString(scheme), + port + }; +} + +} diff --git a/src/Common/EnvironmentProxyConfigurationResolver.h b/src/Common/EnvironmentProxyConfigurationResolver.h new file mode 100644 index 00000000000..c0843f50c32 --- /dev/null +++ b/src/Common/EnvironmentProxyConfigurationResolver.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace DB +{ + +/* + * Grabs proxy configuration from environment variables (http_proxy and https_proxy). + * */ +class EnvironmentProxyConfigurationResolver : public ProxyConfigurationResolver +{ +public: + explicit EnvironmentProxyConfigurationResolver(Protocol protocol_); + + ProxyConfiguration resolve() override; + void errorReport(const ProxyConfiguration &) override {} + +private: + Protocol protocol; +}; + +} diff --git a/src/Common/ProxyConfiguration.h b/src/Common/ProxyConfiguration.h new file mode 100644 index 00000000000..cc951c004bc --- /dev/null +++ b/src/Common/ProxyConfiguration.h @@ -0,0 +1,51 @@ +#pragma once + +#include + +namespace DB +{ + +struct ProxyConfiguration +{ + enum class Protocol + { + HTTP, + HTTPS, + ANY + }; + + static auto protocolFromString(const std::string & str) + { + if (str == "http") + { + return Protocol::HTTP; + } + else if (str == "https") + { + return Protocol::HTTPS; + } + else + { + return Protocol::ANY; + } + } + + static auto protocolToString(Protocol protocol) + { + switch (protocol) + { + case Protocol::HTTP: + return "http"; + case Protocol::HTTPS: + return "https"; + case Protocol::ANY: + return "any"; + } + } + + std::string host; + Protocol protocol; + uint16_t port; +}; + +} diff --git a/src/Common/ProxyConfigurationResolver.h b/src/Common/ProxyConfigurationResolver.h new file mode 100644 index 00000000000..a9c2157ce39 --- /dev/null +++ b/src/Common/ProxyConfigurationResolver.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ + +struct ProxyConfigurationResolver +{ + using Protocol = ProxyConfiguration::Protocol; + + virtual ~ProxyConfigurationResolver() = default; + virtual ProxyConfiguration resolve() = 0; + virtual void errorReport(const ProxyConfiguration & config) = 0; +}; + +} diff --git a/src/Common/ProxyConfigurationResolverProvider.cpp b/src/Common/ProxyConfigurationResolverProvider.cpp new file mode 100644 index 00000000000..c6d6fe9a476 --- /dev/null +++ b/src/Common/ProxyConfigurationResolverProvider.cpp @@ -0,0 +1,208 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +namespace +{ + std::shared_ptr getRemoteResolver( + const String & config_prefix, const Poco::Util::AbstractConfiguration & configuration) + { + auto endpoint = Poco::URI(configuration.getString(config_prefix + ".endpoint")); + auto proxy_scheme = configuration.getString(config_prefix + ".proxy_scheme"); + if (proxy_scheme != "http" && proxy_scheme != "https") + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only HTTP/HTTPS schemas allowed in proxy resolver config: {}", proxy_scheme); + auto proxy_port = configuration.getUInt(config_prefix + ".proxy_port"); + auto cache_ttl = configuration.getUInt(config_prefix + ".proxy_cache_time", 10); + + LOG_DEBUG(&Poco::Logger::get("ProxyConfigurationResolverProvider"), "Configured remote proxy resolver: {}, Scheme: {}, Port: {}", + endpoint.toString(), proxy_scheme, proxy_port); + + return std::make_shared(endpoint, proxy_scheme, proxy_port, cache_ttl); + } + + std::shared_ptr getRemoteResolver( + ProxyConfiguration::Protocol protocol, const String & config_prefix, const Poco::Util::AbstractConfiguration & configuration) + { + std::vector keys; + configuration.keys(config_prefix, keys); + + std::vector uris; + for (const auto & key : keys) + { + if (startsWith(key, "resolver")) + { + auto prefix_with_key = config_prefix + "." + key; + auto proxy_scheme_config_string = prefix_with_key + ".proxy_scheme"; + auto config_protocol = configuration.getString(proxy_scheme_config_string); + + if (ProxyConfiguration::Protocol::ANY == protocol || config_protocol == ProxyConfiguration::protocolToString(protocol)) + { + return getRemoteResolver(prefix_with_key, configuration); + } + } + } + + return nullptr; + } + + auto extractURIList(const String & config_prefix, const Poco::Util::AbstractConfiguration & configuration) + { + std::vector keys; + configuration.keys(config_prefix, keys); + + std::vector uris; + for (const auto & key : keys) + { + if (startsWith(key, "uri")) + { + Poco::URI proxy_uri(configuration.getString(config_prefix + "." + key)); + + if (proxy_uri.getScheme() != "http" && proxy_uri.getScheme() != "https") + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only HTTP/HTTPS schemas allowed in proxy uri: {}", proxy_uri.toString()); + if (proxy_uri.getHost().empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty host in proxy uri: {}", proxy_uri.toString()); + + uris.push_back(proxy_uri); + + LOG_DEBUG(&Poco::Logger::get("ProxyConfigurationResolverProvider"), "Configured proxy: {}", proxy_uri.toString()); + } + } + + return uris; + } + + std::shared_ptr getListResolverNewSyntax( + ProxyConfiguration::Protocol protocol, + const String & config_prefix, + const Poco::Util::AbstractConfiguration & configuration + ) + { + std::vector uris; + + bool include_http_uris = ProxyConfiguration::Protocol::ANY == protocol || ProxyConfiguration::Protocol::HTTP == protocol; + + if (include_http_uris && configuration.has(config_prefix + ".http")) + { + auto http_uris = extractURIList(config_prefix + ".http", configuration); + uris.insert(uris.end(), http_uris.begin(), http_uris.end()); + } + + bool include_https_uris = ProxyConfiguration::Protocol::ANY == protocol || ProxyConfiguration::Protocol::HTTPS == protocol; + + if (include_https_uris && configuration.has(config_prefix + ".https")) + { + auto https_uris = extractURIList(config_prefix + ".https", configuration); + uris.insert(uris.end(), https_uris.begin(), https_uris.end()); + } + + return uris.empty() ? nullptr : std::make_shared(uris); + } + + std::shared_ptr getListResolverOldSyntax( + const String & config_prefix, + const Poco::Util::AbstractConfiguration & configuration + ) + { + auto uris = extractURIList(config_prefix, configuration); + + return uris.empty() ? nullptr : std::make_shared(uris); + } + + std::shared_ptr getListResolver( + ProxyConfiguration::Protocol protocol, const String & config_prefix, const Poco::Util::AbstractConfiguration & configuration + ) + { + std::vector keys; + configuration.keys(config_prefix, keys); + + bool new_setting_syntax = std::find_if( + keys.begin(), + keys.end(), + [](const String & key) + { + return startsWith(key, "http") || startsWith(key, "https"); + }) != keys.end(); + + return new_setting_syntax ? getListResolverNewSyntax(protocol, config_prefix, configuration) + : getListResolverOldSyntax(config_prefix, configuration); + } +} + +std::shared_ptr ProxyConfigurationResolverProvider::get(Protocol protocol) +{ + auto context = Context::getGlobalContextInstance(); + + chassert(context); + + if (auto resolver = getFromSettings(protocol, "", context->getConfigRef())) + { + return resolver; + } + + return std::make_shared(protocol); +} + +std::shared_ptr ProxyConfigurationResolverProvider::getFromSettings( + Protocol protocol, + const String & config_prefix, + const Poco::Util::AbstractConfiguration & configuration +) +{ + auto proxy_prefix = config_prefix.empty() ? "proxy" : config_prefix + ".proxy"; + + if (configuration.has(proxy_prefix)) + { + std::vector config_keys; + configuration.keys(proxy_prefix, config_keys); + + if (auto remote_resolver = getRemoteResolver(protocol, proxy_prefix, configuration)) + { + return remote_resolver; + } + + if (auto list_resolver = getListResolver(protocol, proxy_prefix, configuration)) + { + return list_resolver; + } + } + + return nullptr; +} + +std::shared_ptr ProxyConfigurationResolverProvider::getFromOldSettingsFormat( + const String & config_prefix, + const Poco::Util::AbstractConfiguration & configuration +) +{ + /* + * First try to get it from settings only using the combination of config_prefix and configuration. + * This logic exists for backward compatibility with old S3 storage specific proxy configuration. + * */ + if (auto resolver = ProxyConfigurationResolverProvider::getFromSettings(Protocol::ANY, config_prefix, configuration)) + { + return resolver; + } + + /* + * In case the combination of config_prefix and configuration does not provide a resolver, try to get it from general / new settings. + * Falls back to Environment resolver if no configuration is found. + * */ + return ProxyConfigurationResolverProvider::get(Protocol::ANY); +} + +} diff --git a/src/Common/ProxyConfigurationResolverProvider.h b/src/Common/ProxyConfigurationResolverProvider.h new file mode 100644 index 00000000000..f8c0552bd8a --- /dev/null +++ b/src/Common/ProxyConfigurationResolverProvider.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + + +class ProxyConfigurationResolverProvider +{ +public: + + using Protocol = ProxyConfiguration::Protocol; + + /* + * Returns appropriate ProxyConfigurationResolver based on current CH settings (Remote resolver or List resolver). + * If no configuration is found, returns Environment Resolver. + * */ + static std::shared_ptr get(Protocol protocol); + + /* + * This API exists exclusively for backward compatibility with old S3 storage specific proxy configuration. + * If no configuration is found, returns nullptr. + * */ + static std::shared_ptr getFromOldSettingsFormat( + const String & config_prefix, + const Poco::Util::AbstractConfiguration & configuration + ); + +private: + static std::shared_ptr getFromSettings( + Protocol protocol, + const String & config_prefix, + const Poco::Util::AbstractConfiguration & configuration + ); +}; + +} diff --git a/src/Common/ProxyListConfigurationResolver.cpp b/src/Common/ProxyListConfigurationResolver.cpp new file mode 100644 index 00000000000..e825cdde845 --- /dev/null +++ b/src/Common/ProxyListConfigurationResolver.cpp @@ -0,0 +1,31 @@ +#include + +#include +#include +#include + +namespace DB +{ + +ProxyListConfigurationResolver::ProxyListConfigurationResolver(std::vector proxies_) + : proxies(std::move(proxies_)) +{ +} + +ProxyConfiguration ProxyListConfigurationResolver::resolve() +{ + if (proxies.empty()) + { + return {}; + } + + /// Avoid atomic increment if number of proxies is 1. + size_t index = proxies.size() > 1 ? (access_counter++) % proxies.size() : 0; + + auto & proxy = proxies[index]; + + LOG_DEBUG(&Poco::Logger::get("ProxyListConfigurationResolver"), "Use proxy: {}", proxies[index].toString()); + return ProxyConfiguration {proxy.getHost(), ProxyConfiguration::protocolFromString(proxy.getScheme()), proxy.getPort()}; +} + +} diff --git a/src/Common/ProxyListConfigurationResolver.h b/src/Common/ProxyListConfigurationResolver.h new file mode 100644 index 00000000000..95b84adfef4 --- /dev/null +++ b/src/Common/ProxyListConfigurationResolver.h @@ -0,0 +1,31 @@ +#pragma once + +#include + +#include +#include + +namespace DB +{ + +/* + * Round-robin proxy list resolver. + * */ +class ProxyListConfigurationResolver : public ProxyConfigurationResolver +{ +public: + explicit ProxyListConfigurationResolver(std::vector proxies_); + + ProxyConfiguration resolve() override; + + void errorReport(const ProxyConfiguration &) override {} + +private: + std::vector proxies; + + /// Access counter to get proxy using round-robin strategy. + std::atomic access_counter; + +}; + +} diff --git a/src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.cpp b/src/Common/RemoteProxyConfigurationResolver.cpp similarity index 62% rename from src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.cpp rename to src/Common/RemoteProxyConfigurationResolver.cpp index 18c644f3680..89fb954428c 100644 --- a/src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.cpp +++ b/src/Common/RemoteProxyConfigurationResolver.cpp @@ -1,32 +1,36 @@ -#include "ProxyResolverConfiguration.h" - -#if USE_AWS_S3 +#include #include #include -#include "Poco/StreamCopier.h" +#include #include #include #include #include -namespace DB::ErrorCodes +namespace DB +{ + +namespace ErrorCodes { extern const int BAD_ARGUMENTS; } -namespace DB::S3 -{ - -ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_ - , unsigned proxy_port_, unsigned cache_ttl_) - : endpoint(endpoint_), proxy_scheme(std::move(proxy_scheme_)), proxy_port(proxy_port_), cache_ttl(cache_ttl_) +RemoteProxyConfigurationResolver::RemoteProxyConfigurationResolver( + const Poco::URI & endpoint_, + String proxy_protocol_, + unsigned proxy_port_, + unsigned cache_ttl_ +) +: endpoint(endpoint_), proxy_protocol(std::move(proxy_protocol_)), proxy_port(proxy_port_), cache_ttl(cache_ttl_) { } -ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const Aws::Http::HttpRequest &) +ProxyConfiguration RemoteProxyConfigurationResolver::resolve() { - LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Obtain proxy using resolver: {}", endpoint.toString()); + auto * logger = &Poco::Logger::get("RemoteProxyConfigurationResolver"); + + LOG_DEBUG(logger, "Obtain proxy using resolver: {}", endpoint.toString()); std::lock_guard lock(cache_mutex); @@ -34,7 +38,12 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const if (cache_ttl.count() && cache_valid && now <= cache_timestamp + cache_ttl && now >= cache_timestamp) { - LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use cached proxy: {}://{}:{}", Aws::Http::SchemeMapper::ToString(cached_config.proxy_scheme), cached_config.proxy_host, cached_config.proxy_port); + LOG_DEBUG(logger, + "Use cached proxy: {}://{}:{}", + cached_config.protocol, + cached_config.host, + cached_config.port + ); return cached_config; } @@ -84,11 +93,11 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const /// Read proxy host as string from response body. Poco::StreamCopier::copyToString(response_body_stream, proxy_host); - LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use proxy: {}://{}:{}", proxy_scheme, proxy_host, proxy_port); + LOG_DEBUG(logger, "Use proxy: {}://{}:{}", proxy_protocol, proxy_host, proxy_port); - cached_config.proxy_scheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str()); - cached_config.proxy_host = proxy_host; - cached_config.proxy_port = proxy_port; + cached_config.protocol = ProxyConfiguration::protocolFromString(proxy_protocol); + cached_config.host = proxy_host; + cached_config.port = proxy_port; cache_timestamp = std::chrono::system_clock::now(); cache_valid = true; @@ -96,16 +105,14 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const } catch (...) { - tryLogCurrentException("AWSClient", "Failed to obtain proxy"); - /// Don't use proxy if it can't be obtained. - ClientConfigurationPerRequest cfg; - return cfg; + tryLogCurrentException("RemoteProxyConfigurationResolver", "Failed to obtain proxy"); + return {}; } } -void ProxyResolverConfiguration::errorReport(const ClientConfigurationPerRequest & config) +void RemoteProxyConfigurationResolver::errorReport(const ProxyConfiguration & config) { - if (config.proxy_host.empty()) + if (config.host.empty()) return; std::lock_guard lock(cache_mutex); @@ -113,8 +120,8 @@ void ProxyResolverConfiguration::errorReport(const ClientConfigurationPerRequest if (!cache_ttl.count() || !cache_valid) return; - if (std::tie(cached_config.proxy_scheme, cached_config.proxy_host, cached_config.proxy_port) - != std::tie(config.proxy_scheme, config.proxy_host, config.proxy_port)) + if (std::tie(cached_config.protocol, cached_config.host, cached_config.port) + != std::tie(config.protocol, config.host, config.port)) return; /// Invalidate cached proxy when got error with this proxy @@ -122,5 +129,3 @@ void ProxyResolverConfiguration::errorReport(const ClientConfigurationPerRequest } } - -#endif diff --git a/src/Common/RemoteProxyConfigurationResolver.h b/src/Common/RemoteProxyConfigurationResolver.h new file mode 100644 index 00000000000..5f8b5f9ccd3 --- /dev/null +++ b/src/Common/RemoteProxyConfigurationResolver.h @@ -0,0 +1,46 @@ +#pragma once + +#include + +#include + +#include +#include + +namespace DB +{ + +/* + * Makes an HTTP GET request to the specified endpoint to obtain a proxy host. + * */ +class RemoteProxyConfigurationResolver : public ProxyConfigurationResolver +{ +public: + RemoteProxyConfigurationResolver( + const Poco::URI & endpoint_, + String proxy_protocol_, + unsigned proxy_port_, + unsigned cache_ttl_ + ); + + ProxyConfiguration resolve() override; + + void errorReport(const ProxyConfiguration & config) override; + +private: + + /// Endpoint to obtain a proxy host. + const Poco::URI endpoint; + /// Scheme for obtained proxy. + const String proxy_protocol; + /// Port for obtained proxy. + const unsigned proxy_port; + + std::mutex cache_mutex; + bool cache_valid = false; + std::chrono::time_point cache_timestamp; + const std::chrono::seconds cache_ttl{0}; + ProxyConfiguration cached_config; +}; + +} diff --git a/src/Common/tests/gtest_helper_functions.h b/src/Common/tests/gtest_helper_functions.h index 9d2ec5bee41..54c9ae9170d 100644 --- a/src/Common/tests/gtest_helper_functions.h +++ b/src/Common/tests/gtest_helper_functions.h @@ -73,3 +73,25 @@ inline std::string xmlNodeAsString(Poco::XML::Node *pNode) result += ("\n"); return Poco::XML::fromXMLString(result); } + +struct EnvironmentProxySetter +{ + EnvironmentProxySetter(const Poco::URI & http_proxy, const Poco::URI & https_proxy) + { + if (!http_proxy.empty()) + { + setenv("http_proxy", http_proxy.toString().c_str(), 1); // NOLINT(concurrency-mt-unsafe) + } + + if (!https_proxy.empty()) + { + setenv("https_proxy", https_proxy.toString().c_str(), 1); // NOLINT(concurrency-mt-unsafe) + } + } + + ~EnvironmentProxySetter() + { + unsetenv("http_proxy"); // NOLINT(concurrency-mt-unsafe) + unsetenv("https_proxy"); // NOLINT(concurrency-mt-unsafe) + } +}; diff --git a/src/Common/tests/gtest_proxy_configuration_resolver_provider.cpp b/src/Common/tests/gtest_proxy_configuration_resolver_provider.cpp new file mode 100644 index 00000000000..b737a7e949e --- /dev/null +++ b/src/Common/tests/gtest_proxy_configuration_resolver_provider.cpp @@ -0,0 +1,122 @@ +#include + +#include +#include +#include + +using ConfigurationPtr = Poco::AutoPtr; + +class ProxyConfigurationResolverProviderTests : public ::testing::Test +{ +protected: + + static void SetUpTestSuite() { + context = getContext().context; + } + + static void TearDownTestSuite() { + context->setConfig(Poco::AutoPtr(new Poco::Util::MapConfiguration())); + } + + static DB::ContextMutablePtr context; +}; + +DB::ContextMutablePtr ProxyConfigurationResolverProviderTests::context; + +Poco::URI http_env_proxy_server = Poco::URI("http://http_environment_proxy:3128"); +Poco::URI https_env_proxy_server = Poco::URI("http://https_environment_proxy:3128"); + +Poco::URI http_list_proxy_server = Poco::URI("http://http_list_proxy:3128"); +Poco::URI https_list_proxy_server = Poco::URI("http://https_list_proxy:3128"); + +TEST_F(ProxyConfigurationResolverProviderTests, EnvironmentResolverShouldBeUsedIfNoSettings) +{ + EnvironmentProxySetter setter(http_env_proxy_server, https_env_proxy_server); + + auto http_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve(); + auto https_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve(); + + ASSERT_EQ(http_configuration.host, http_env_proxy_server.getHost()); + ASSERT_EQ(http_configuration.port, http_env_proxy_server.getPort()); + ASSERT_EQ(http_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_env_proxy_server.getScheme())); + + ASSERT_EQ(https_configuration.host, https_env_proxy_server.getHost()); + ASSERT_EQ(https_configuration.port, https_env_proxy_server.getPort()); + ASSERT_EQ(https_configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_env_proxy_server.getScheme())); +} + +TEST_F(ProxyConfigurationResolverProviderTests, ListHTTPOnly) +{ + ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); + + config->setString("proxy", ""); + config->setString("proxy.http", ""); + config->setString("proxy.http.uri", http_list_proxy_server.toString()); + context->setConfig(config); + + auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve(); + + ASSERT_EQ(http_proxy_configuration.host, http_list_proxy_server.getHost()); + ASSERT_EQ(http_proxy_configuration.port, http_list_proxy_server.getPort()); + ASSERT_EQ(http_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_list_proxy_server.getScheme())); + + auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve(); + + // No https configuration since it's not set + ASSERT_EQ(https_proxy_configuration.host, ""); + ASSERT_EQ(https_proxy_configuration.port, 0); +} + +TEST_F(ProxyConfigurationResolverProviderTests, ListHTTPSOnly) +{ + ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); + + config->setString("proxy", ""); + config->setString("proxy.https", ""); + config->setString("proxy.https.uri", https_list_proxy_server.toString()); + context->setConfig(config); + + auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve(); + + ASSERT_EQ(http_proxy_configuration.host, ""); + ASSERT_EQ(http_proxy_configuration.port, 0); + + auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve(); + + ASSERT_EQ(https_proxy_configuration.host, https_list_proxy_server.getHost()); + + // still HTTP because the proxy host is not HTTPS + ASSERT_EQ(https_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_list_proxy_server.getScheme())); + ASSERT_EQ(https_proxy_configuration.port, https_list_proxy_server.getPort()); +} + +TEST_F(ProxyConfigurationResolverProviderTests, ListBoth) +{ + ConfigurationPtr config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); + + config->setString("proxy", ""); + config->setString("proxy.http", ""); + config->setString("proxy.http.uri", http_list_proxy_server.toString()); + + config->setString("proxy", ""); + config->setString("proxy.https", ""); + config->setString("proxy.https.uri", https_list_proxy_server.toString()); + + context->setConfig(config); + + auto http_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTP)->resolve(); + + ASSERT_EQ(http_proxy_configuration.host, http_list_proxy_server.getHost()); + ASSERT_EQ(http_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_list_proxy_server.getScheme())); + ASSERT_EQ(http_proxy_configuration.port, http_list_proxy_server.getPort()); + + auto https_proxy_configuration = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::Protocol::HTTPS)->resolve(); + + ASSERT_EQ(https_proxy_configuration.host, https_list_proxy_server.getHost()); + + // still HTTP because the proxy host is not HTTPS + ASSERT_EQ(https_proxy_configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_list_proxy_server.getScheme())); + ASSERT_EQ(https_proxy_configuration.port, https_list_proxy_server.getPort()); +} + +// remote resolver is tricky to be tested in unit tests diff --git a/src/Common/tests/gtest_proxy_environment_configuration.cpp b/src/Common/tests/gtest_proxy_environment_configuration.cpp new file mode 100644 index 00000000000..b14307d879a --- /dev/null +++ b/src/Common/tests/gtest_proxy_environment_configuration.cpp @@ -0,0 +1,96 @@ +#include + +#include +#include +#include + +namespace +{ + auto http_proxy_server = Poco::URI("http://proxy_server:3128"); + auto https_proxy_server = Poco::URI("https://proxy_server:3128"); +} + +TEST(EnvironmentProxyConfigurationResolver, TestHTTP) +{ + EnvironmentProxySetter setter(http_proxy_server, {}); + + DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::HTTP); + + auto configuration = resolver.resolve(); + + ASSERT_EQ(configuration.host, http_proxy_server.getHost()); + ASSERT_EQ(configuration.port, http_proxy_server.getPort()); + ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_proxy_server.getScheme())); +} + +TEST(EnvironmentProxyConfigurationResolver, TestHTTPNoEnv) +{ + DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::HTTP); + + auto configuration = resolver.resolve(); + + ASSERT_EQ(configuration.host, ""); + ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::Protocol::HTTP); + ASSERT_EQ(configuration.port, 0u); +} + +TEST(EnvironmentProxyConfigurationResolver, TestHTTPs) +{ + EnvironmentProxySetter setter({}, https_proxy_server); + + DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::HTTPS); + + auto configuration = resolver.resolve(); + + ASSERT_EQ(configuration.host, https_proxy_server.getHost()); + ASSERT_EQ(configuration.port, https_proxy_server.getPort()); + ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_proxy_server.getScheme())); +} + +TEST(EnvironmentProxyConfigurationResolver, TestHTTPsNoEnv) +{ + DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::HTTPS); + + auto configuration = resolver.resolve(); + + ASSERT_EQ(configuration.host, ""); + ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::Protocol::HTTP); + ASSERT_EQ(configuration.port, 0u); +} + +TEST(EnvironmentProxyConfigurationResolver, TestANYHTTP) +{ + EnvironmentProxySetter setter(http_proxy_server, {}); + + DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::ANY); + + auto configuration = resolver.resolve(); + + ASSERT_EQ(configuration.host, http_proxy_server.getHost()); + ASSERT_EQ(configuration.port, http_proxy_server.getPort()); + ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::protocolFromString(http_proxy_server.getScheme())); +} + +TEST(EnvironmentProxyConfigurationResolver, TestANYHTTPS) +{ + EnvironmentProxySetter setter({}, https_proxy_server); + + DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::ANY); + + auto configuration = resolver.resolve(); + + ASSERT_EQ(configuration.host, https_proxy_server.getHost()); + ASSERT_EQ(configuration.port, https_proxy_server.getPort()); + ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::protocolFromString(https_proxy_server.getScheme())); +} + +TEST(EnvironmentProxyConfigurationResolver, TestANYNoEnv) +{ + DB::EnvironmentProxyConfigurationResolver resolver(DB::ProxyConfiguration::Protocol::ANY); + + auto configuration = resolver.resolve(); + + ASSERT_EQ(configuration.host, ""); + ASSERT_EQ(configuration.protocol, DB::ProxyConfiguration::Protocol::HTTP); + ASSERT_EQ(configuration.port, 0u); +} diff --git a/src/Common/tests/gtest_proxy_list_configuration_resolver.cpp b/src/Common/tests/gtest_proxy_list_configuration_resolver.cpp new file mode 100644 index 00000000000..4994cc3e2f7 --- /dev/null +++ b/src/Common/tests/gtest_proxy_list_configuration_resolver.cpp @@ -0,0 +1,26 @@ +#include + +#include +#include + +namespace +{ + auto proxy_server1 = Poco::URI("http://proxy_server1:3128"); + auto proxy_server2 = Poco::URI("http://proxy_server2:3128"); +} + +TEST(ProxyListConfigurationResolver, SimpleTest) +{ + DB::ProxyListConfigurationResolver resolver({proxy_server1, proxy_server2}); + + auto configuration1 = resolver.resolve(); + auto configuration2 = resolver.resolve(); + + ASSERT_EQ(configuration1.host, proxy_server1.getHost()); + ASSERT_EQ(configuration1.port, proxy_server1.getPort()); + ASSERT_EQ(configuration1.protocol, DB::ProxyConfiguration::protocolFromString(proxy_server1.getScheme())); + + ASSERT_EQ(configuration2.host, proxy_server2.getHost()); + ASSERT_EQ(configuration2.port, proxy_server2.getPort()); + ASSERT_EQ(configuration2.protocol, DB::ProxyConfiguration::protocolFromString(proxy_server2.getScheme())); +} diff --git a/src/Coordination/KeeperSnapshotManagerS3.cpp b/src/Coordination/KeeperSnapshotManagerS3.cpp index cbb5c110eda..e44b9f376c7 100644 --- a/src/Coordination/KeeperSnapshotManagerS3.cpp +++ b/src/Coordination/KeeperSnapshotManagerS3.cpp @@ -92,7 +92,8 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo auth_settings.region, RemoteHostFilter(), s3_max_redirects, enable_s3_requests_logging, - /* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {}); + /* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {}, + new_uri.uri.getScheme()); client_configuration.endpointOverride = new_uri.endpoint; diff --git a/src/Disks/ObjectStorages/S3/ProxyConfiguration.h b/src/Disks/ObjectStorages/S3/ProxyConfiguration.h deleted file mode 100644 index fd2761c2cba..00000000000 --- a/src/Disks/ObjectStorages/S3/ProxyConfiguration.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include "config.h" - -#if USE_AWS_S3 - -#include -#include -#include -#include - -namespace DB::S3 -{ -class ProxyConfiguration -{ -public: - virtual ~ProxyConfiguration() = default; - /// Returns proxy configuration on each HTTP request. - virtual ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) = 0; - virtual void errorReport(const ClientConfigurationPerRequest & config) = 0; -}; - -} - -#endif diff --git a/src/Disks/ObjectStorages/S3/ProxyListConfiguration.cpp b/src/Disks/ObjectStorages/S3/ProxyListConfiguration.cpp deleted file mode 100644 index 7c7bc7966ea..00000000000 --- a/src/Disks/ObjectStorages/S3/ProxyListConfiguration.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include "ProxyListConfiguration.h" - -#if USE_AWS_S3 - -#include -#include - -namespace DB::S3 -{ -ProxyListConfiguration::ProxyListConfiguration(std::vector proxies_) : proxies(std::move(proxies_)), access_counter(0) -{ -} - - -ClientConfigurationPerRequest ProxyListConfiguration::getConfiguration(const Aws::Http::HttpRequest &) -{ - /// Avoid atomic increment if number of proxies is 1. - size_t index = proxies.size() > 1 ? (access_counter++) % proxies.size() : 0; - - ClientConfigurationPerRequest cfg; - cfg.proxy_scheme = Aws::Http::SchemeMapper::FromString(proxies[index].getScheme().c_str()); - cfg.proxy_host = proxies[index].getHost(); - cfg.proxy_port = proxies[index].getPort(); - - LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Use proxy: {}", proxies[index].toString()); - - return cfg; -} - -} - -#endif diff --git a/src/Disks/ObjectStorages/S3/ProxyListConfiguration.h b/src/Disks/ObjectStorages/S3/ProxyListConfiguration.h deleted file mode 100644 index 14fac8baff5..00000000000 --- a/src/Disks/ObjectStorages/S3/ProxyListConfiguration.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include "config.h" - -#if USE_AWS_S3 - -#include // for std::atomic - -#include "ProxyConfiguration.h" - -namespace DB::S3 -{ -/** - * For each request to S3 it chooses a proxy from the specified list using round-robin strategy. - */ -class ProxyListConfiguration : public ProxyConfiguration -{ -public: - explicit ProxyListConfiguration(std::vector proxies_); - ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override; - void errorReport(const ClientConfigurationPerRequest &) override {} - -private: - /// List of configured proxies. - const std::vector proxies; - /// Access counter to get proxy using round-robin strategy. - std::atomic access_counter; -}; - -} - -#endif diff --git a/src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.h b/src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.h deleted file mode 100644 index d6d7456a6ac..00000000000 --- a/src/Disks/ObjectStorages/S3/ProxyResolverConfiguration.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include "config.h" - -#if USE_AWS_S3 - -#include "ProxyConfiguration.h" - -#include - -namespace DB::S3 -{ -/** - * Proxy configuration where proxy host is obtained each time from specified endpoint. - * For each request to S3 it makes GET request to specified endpoint URL and reads proxy host from a response body. - * Specified scheme and port added to obtained proxy host to form completed proxy URL. - */ -class ProxyResolverConfiguration : public ProxyConfiguration -{ -public: - ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_, unsigned cache_ttl_); - ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override; - void errorReport(const ClientConfigurationPerRequest & config) override; - -private: - /// Endpoint to obtain a proxy host. - const Poco::URI endpoint; - /// Scheme for obtained proxy. - const String proxy_scheme; - /// Port for obtained proxy. - const unsigned proxy_port; - - std::mutex cache_mutex; - bool cache_valid = false; - std::chrono::time_point cache_timestamp; - const std::chrono::seconds cache_ttl{0}; - ClientConfigurationPerRequest cached_config; -}; - -} - -#endif diff --git a/src/Disks/ObjectStorages/S3/diskSettings.cpp b/src/Disks/ObjectStorages/S3/diskSettings.cpp index 0bd35c07a4b..ea40c49ff4b 100644 --- a/src/Disks/ObjectStorages/S3/diskSettings.cpp +++ b/src/Disks/ObjectStorages/S3/diskSettings.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -17,9 +18,6 @@ #include #include -#include -#include -#include #include #include #include @@ -44,76 +42,15 @@ std::unique_ptr getSettings(const Poco::Util::AbstractC config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000)); } -std::shared_ptr getProxyResolverConfiguration( - const String & prefix, const Poco::Util::AbstractConfiguration & proxy_resolver_config) -{ - auto endpoint = Poco::URI(proxy_resolver_config.getString(prefix + ".endpoint")); - auto proxy_scheme = proxy_resolver_config.getString(prefix + ".proxy_scheme"); - if (proxy_scheme != "http" && proxy_scheme != "https") - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only HTTP/HTTPS schemas allowed in proxy resolver config: {}", proxy_scheme); - auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port"); - auto cache_ttl = proxy_resolver_config.getUInt(prefix + ".proxy_cache_time", 10); - - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy resolver: {}, Scheme: {}, Port: {}", - endpoint.toString(), proxy_scheme, proxy_port); - - return std::make_shared(endpoint, proxy_scheme, proxy_port, cache_ttl); -} - -std::shared_ptr getProxyListConfiguration( - const String & prefix, const Poco::Util::AbstractConfiguration & proxy_config) -{ - std::vector keys; - proxy_config.keys(prefix, keys); - - std::vector proxies; - for (const auto & key : keys) - if (startsWith(key, "uri")) - { - Poco::URI proxy_uri(proxy_config.getString(prefix + "." + key)); - - if (proxy_uri.getScheme() != "http" && proxy_uri.getScheme() != "https") - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only HTTP/HTTPS schemas allowed in proxy uri: {}", proxy_uri.toString()); - if (proxy_uri.getHost().empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty host in proxy uri: {}", proxy_uri.toString()); - - proxies.push_back(proxy_uri); - - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Configured proxy: {}", proxy_uri.toString()); - } - - if (!proxies.empty()) - return std::make_shared(proxies); - - return nullptr; -} - -std::shared_ptr getProxyConfiguration(const String & prefix, const Poco::Util::AbstractConfiguration & config) -{ - if (!config.has(prefix + ".proxy")) - return nullptr; - - std::vector config_keys; - config.keys(prefix + ".proxy", config_keys); - - if (auto resolver_configs = std::count(config_keys.begin(), config_keys.end(), "resolver")) - { - if (resolver_configs > 1) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Multiple proxy resolver configurations aren't allowed"); - - return getProxyResolverConfiguration(prefix + ".proxy.resolver", config); - } - - return getProxyListConfiguration(prefix + ".proxy", config); -} - - std::unique_ptr getClient( const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, const S3ObjectStorageSettings & settings) { + String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint")); + S3::URI uri(endpoint); + S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration( config.getString(config_prefix + ".region", ""), context->getRemoteHostFilter(), @@ -121,10 +58,9 @@ std::unique_ptr getClient( context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging, /* for_disk_s3 = */ true, settings.request_settings.get_request_throttler, - settings.request_settings.put_request_throttler); + settings.request_settings.put_request_throttler, + uri.uri.getScheme()); - String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint")); - S3::URI uri(endpoint); if (uri.key.back() != '/') throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 path must ends with '/', but '{}' doesn't.", uri.key); @@ -136,11 +72,14 @@ std::unique_ptr getClient( client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000); client_configuration.wait_on_pool_size_limit = false; - auto proxy_config = getProxyConfiguration(config_prefix, config); + /* + * Override proxy configuration for backwards compatibility with old configuration format. + * */ + auto proxy_config = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(config_prefix, config); if (proxy_config) { client_configuration.per_request_configuration - = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); }; + = [proxy_config]() { return proxy_config->resolve(); }; client_configuration.error_report = [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); }; } diff --git a/src/IO/HTTPCommon.cpp b/src/IO/HTTPCommon.cpp index ddd7ccbe483..add3e96c2c1 100644 --- a/src/IO/HTTPCommon.cpp +++ b/src/IO/HTTPCommon.cpp @@ -137,7 +137,12 @@ namespace throw Exception(ErrorCodes::UNSUPPORTED_URI_SCHEME, "Unsupported scheme in URI '{}'", uri.toString()); } - HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive) + HTTPSessionPtr makeHTTPSessionImpl( + const std::string & host, + UInt16 port, + bool https, + bool keep_alive, + Poco::Net::HTTPClientSession::ProxyConfig proxy_config = {}) { HTTPSessionPtr session; @@ -158,6 +163,9 @@ namespace /// doesn't work properly without patch session->setKeepAlive(keep_alive); + + session->setProxyConfig(proxy_config); + return session; } @@ -333,13 +341,17 @@ void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_ response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds())); } -HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts) +HTTPSessionPtr makeHTTPSession( + const Poco::URI & uri, + const ConnectionTimeouts & timeouts, + Poco::Net::HTTPClientSession::ProxyConfig proxy_config +) { const std::string & host = uri.getHost(); UInt16 port = uri.getPort(); bool https = isHTTPS(uri); - auto session = makeHTTPSessionImpl(host, port, https, false); + auto session = makeHTTPSessionImpl(host, port, https, false, proxy_config); setTimeouts(*session, timeouts); return session; } diff --git a/src/IO/HTTPCommon.h b/src/IO/HTTPCommon.h index f10fd748200..caf2fa361d9 100644 --- a/src/IO/HTTPCommon.h +++ b/src/IO/HTTPCommon.h @@ -69,7 +69,11 @@ void markSessionForReuse(PooledHTTPSessionPtr session); void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout); /// Create session object to perform requests and set required parameters. -HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts); +HTTPSessionPtr makeHTTPSession( + const Poco::URI & uri, + const ConnectionTimeouts & timeouts, + Poco::Net::HTTPClientSession::ProxyConfig proxy_config = {} +); /// As previous method creates session, but tooks it from pool, without and with proxy uri. PooledHTTPSessionPtr makePooledHTTPSession( diff --git a/src/IO/ReadWriteBufferFromHTTP.cpp b/src/IO/ReadWriteBufferFromHTTP.cpp index c662c30981b..76ff5980f14 100644 --- a/src/IO/ReadWriteBufferFromHTTP.cpp +++ b/src/IO/ReadWriteBufferFromHTTP.cpp @@ -250,7 +250,8 @@ ReadWriteBufferFromHTTPBase::ReadWriteBufferFromHTTPBase( bool delay_initialization, bool use_external_buffer_, bool http_skip_not_found_url_, - std::optional file_info_) + std::optional file_info_, + Poco::Net::HTTPClientSession::ProxyConfig proxy_config_) : SeekableReadBuffer(nullptr, 0) , uri {uri_} , method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} @@ -265,6 +266,7 @@ ReadWriteBufferFromHTTPBase::ReadWriteBufferFromHTTPBase( , http_skip_not_found_url(http_skip_not_found_url_) , settings {settings_} , log(&Poco::Logger::get("ReadWriteBufferFromHTTP")) + , proxy_config(proxy_config_) { if (settings.http_max_tries <= 0 || settings.http_retry_initial_backoff_ms <= 0 || settings.http_retry_initial_backoff_ms >= settings.http_retry_max_backoff_ms) @@ -848,12 +850,12 @@ HTTPFileInfo ReadWriteBufferFromHTTPBase::parseFileInfo(con } -SessionFactory::SessionFactory(const ConnectionTimeouts & timeouts_) - : timeouts(timeouts_) {} +SessionFactory::SessionFactory(const ConnectionTimeouts & timeouts_, Poco::Net::HTTPClientSession::ProxyConfig proxy_config_) + : timeouts(timeouts_), proxy_config(proxy_config_) {} SessionFactory::SessionType SessionFactory::buildNewSession(const Poco::URI & uri) { - return makeHTTPSession(uri, timeouts); + return makeHTTPSession(uri, timeouts, proxy_config); } ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP( @@ -870,9 +872,10 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP( bool delay_initialization_, bool use_external_buffer_, bool skip_not_found_url_, - std::optional file_info_) + std::optional file_info_, + Poco::Net::HTTPClientSession::ProxyConfig proxy_config_) : Parent( - std::make_shared(uri_, max_redirects, std::make_shared(timeouts)), + std::make_shared(uri_, max_redirects, std::make_shared(timeouts, proxy_config_)), uri_, credentials_, method_, @@ -884,7 +887,8 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP( delay_initialization_, use_external_buffer_, skip_not_found_url_, - file_info_) {} + file_info_, + proxy_config_) {} PooledSessionFactory::PooledSessionFactory( diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index 2d2ae5fe724..7ab910735c0 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -111,6 +111,8 @@ namespace detail ReadSettings settings; Poco::Logger * log; + Poco::Net::HTTPClientSession::ProxyConfig proxy_config; + bool withPartialContent(const HTTPRange & range) const; size_t getOffset() const; @@ -161,7 +163,8 @@ namespace detail bool delay_initialization = false, bool use_external_buffer_ = false, bool http_skip_not_found_url_ = false, - std::optional file_info_ = std::nullopt); + std::optional file_info_ = std::nullopt, + Poco::Net::HTTPClientSession::ProxyConfig proxy_config_ = {}); void callWithRedirects(Poco::Net::HTTPResponse & response, const String & method_, bool throw_on_all_errors = false, bool for_object_info = false); @@ -212,13 +215,14 @@ namespace detail class SessionFactory { public: - explicit SessionFactory(const ConnectionTimeouts & timeouts_); + explicit SessionFactory(const ConnectionTimeouts & timeouts_, Poco::Net::HTTPClientSession::ProxyConfig proxy_config_ = {}); using SessionType = HTTPSessionPtr; SessionType buildNewSession(const Poco::URI & uri); private: ConnectionTimeouts timeouts; + Poco::Net::HTTPClientSession::ProxyConfig proxy_config; }; class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase>> @@ -241,7 +245,8 @@ public: bool delay_initialization_ = true, bool use_external_buffer_ = false, bool skip_not_found_url_ = false, - std::optional file_info_ = std::nullopt); + std::optional file_info_ = std::nullopt, + Poco::Net::HTTPClientSession::ProxyConfig proxy_config_ = {}); }; class PooledSessionFactory diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 7e251dc415a..6a2791b20a0 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -24,6 +24,8 @@ #include #include +#include + namespace ProfileEvents { @@ -861,16 +863,28 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT bool enable_s3_requests_logging, bool for_disk_s3, const ThrottlerPtr & get_request_throttler, - const ThrottlerPtr & put_request_throttler) + const ThrottlerPtr & put_request_throttler, + const String & protocol) { - return PocoHTTPClientConfiguration( + auto proxy_configuration_resolver = DB::ProxyConfigurationResolverProvider::get(DB::ProxyConfiguration::protocolFromString(protocol)); + + auto per_request_configuration = [=] () { return proxy_configuration_resolver->resolve(); }; + auto error_report = [=] (const DB::ProxyConfiguration & req) { proxy_configuration_resolver->errorReport(req); }; + + auto config = PocoHTTPClientConfiguration( + per_request_configuration, force_region, remote_host_filter, s3_max_redirects, enable_s3_requests_logging, for_disk_s3, get_request_throttler, - put_request_throttler); + put_request_throttler, + error_report); + + config.scheme = Aws::Http::SchemeMapper::FromString(protocol.c_str()); + + return config; } } diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index 1b0fdcefe32..a2be7ff5566 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -314,7 +314,8 @@ public: bool enable_s3_requests_logging, bool for_disk_s3, const ThrottlerPtr & get_request_throttler, - const ThrottlerPtr & put_request_throttler); + const ThrottlerPtr & put_request_throttler, + const String & protocol = "https"); private: ClientFactory(); diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index d59bc4fd115..c8820496bfa 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -532,13 +532,13 @@ S3CredentialsProviderChain::S3CredentialsProviderChain( configuration.enable_s3_requests_logging, configuration.for_disk_s3, configuration.get_request_throttler, - configuration.put_request_throttler); + configuration.put_request_throttler, + Aws::Http::SchemeMapper::ToString(Aws::Http::Scheme::HTTP)); /// See MakeDefaultHttpResourceClientConfiguration(). /// This is part of EC2 metadata client, but unfortunately it can't be accessed from outside /// of contrib/aws/aws-cpp-sdk-core/source/internal/AWSHttpResourceClient.cpp aws_client_configuration.maxConnections = 2; - aws_client_configuration.scheme = Aws::Http::Scheme::HTTP; /// Explicitly set the proxy settings to empty/zero to avoid relying on defaults that could potentially change /// in the future. diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 8ed50416f3f..90327d4dc2e 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -85,20 +85,24 @@ namespace DB::S3 { PocoHTTPClientConfiguration::PocoHTTPClientConfiguration( + std::function per_request_configuration_, const String & force_region_, const RemoteHostFilter & remote_host_filter_, unsigned int s3_max_redirects_, bool enable_s3_requests_logging_, bool for_disk_s3_, const ThrottlerPtr & get_request_throttler_, - const ThrottlerPtr & put_request_throttler_) - : force_region(force_region_) + const ThrottlerPtr & put_request_throttler_, + std::function error_report_) + : per_request_configuration(per_request_configuration_) + , force_region(force_region_) , remote_host_filter(remote_host_filter_) , s3_max_redirects(s3_max_redirects_) , enable_s3_requests_logging(enable_s3_requests_logging_) , for_disk_s3(for_disk_s3_) , get_request_throttler(get_request_throttler_) , put_request_throttler(put_request_throttler_) + , error_report(error_report_) { } @@ -262,8 +266,8 @@ void PocoHTTPClient::makeRequestInternal( Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const { /// Most sessions in pool are already connected and it is not possible to set proxy host/port to a connected session. - const auto request_configuration = per_request_configuration(request); - if (http_connection_pool_size && request_configuration.proxy_host.empty()) + const auto request_configuration = per_request_configuration(); + if (http_connection_pool_size && request_configuration.host.empty()) makeRequestInternalImpl(request, request_configuration, response, readLimiter, writeLimiter); else makeRequestInternalImpl(request, request_configuration, response, readLimiter, writeLimiter); @@ -272,7 +276,7 @@ void PocoHTTPClient::makeRequestInternal( template void PocoHTTPClient::makeRequestInternalImpl( Aws::Http::HttpRequest & request, - const ClientConfigurationPerRequest & request_configuration, + const DB::ProxyConfiguration & request_configuration, std::shared_ptr & response, Aws::Utils::RateLimits::RateLimiterInterface *, Aws::Utils::RateLimits::RateLimiterInterface *) const @@ -327,7 +331,7 @@ void PocoHTTPClient::makeRequestInternalImpl( Poco::URI target_uri(uri); SessionPtr session; - if (!request_configuration.proxy_host.empty()) + if (!request_configuration.host.empty()) { if (enable_s3_requests_logging) LOG_TEST(log, "Due to reverse proxy host name ({}) won't be resolved on ClickHouse side", uri); @@ -339,12 +343,12 @@ void PocoHTTPClient::makeRequestInternalImpl( target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit); else session = makeHTTPSession(target_uri, timeouts); - bool use_tunnel = request_configuration.proxy_scheme == Aws::Http::Scheme::HTTP && target_uri.getScheme() == "https"; + bool use_tunnel = request_configuration.protocol == DB::ProxyConfiguration::Protocol::HTTP && target_uri.getScheme() == "https"; session->setProxy( - request_configuration.proxy_host, - request_configuration.proxy_port, - Aws::Http::SchemeMapper::ToString(request_configuration.proxy_scheme), + request_configuration.host, + request_configuration.port, + DB::ProxyConfiguration::protocolToString(request_configuration.protocol), use_tunnel ); } diff --git a/src/IO/S3/PocoHTTPClient.h b/src/IO/S3/PocoHTTPClient.h index 92d3d5c5747..d1ce148a707 100644 --- a/src/IO/S3/PocoHTTPClient.h +++ b/src/IO/S3/PocoHTTPClient.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -34,16 +35,9 @@ namespace DB::S3 { class ClientFactory; -struct ClientConfigurationPerRequest -{ - Aws::Http::Scheme proxy_scheme = Aws::Http::Scheme::HTTPS; - String proxy_host; - unsigned proxy_port = 0; -}; - struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration { - std::function per_request_configuration = [] (const Aws::Http::HttpRequest &) { return ClientConfigurationPerRequest(); }; + std::function per_request_configuration; String force_region; const RemoteHostFilter & remote_host_filter; unsigned int s3_max_redirects; @@ -62,17 +56,19 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration void updateSchemeAndRegion(); - std::function error_report; + std::function error_report; private: PocoHTTPClientConfiguration( + std::function per_request_configuration_, const String & force_region_, const RemoteHostFilter & remote_host_filter_, unsigned int s3_max_redirects_, bool enable_s3_requests_logging_, bool for_disk_s3_, const ThrottlerPtr & get_request_throttler_, - const ThrottlerPtr & put_request_throttler_ + const ThrottlerPtr & put_request_throttler_, + std::function error_report_ ); /// Constructor of Aws::Client::ClientConfiguration must be called after AWS SDK initialization. @@ -165,7 +161,7 @@ private: template void makeRequestInternalImpl( Aws::Http::HttpRequest & request, - const ClientConfigurationPerRequest & per_request_configuration, + const DB::ProxyConfiguration & per_request_configuration, std::shared_ptr & response, Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const; @@ -174,8 +170,8 @@ protected: static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request); void addMetric(const Aws::Http::HttpRequest & request, S3MetricType type, ProfileEvents::Count amount = 1) const; - std::function per_request_configuration; - std::function error_report; + std::function per_request_configuration; + std::function error_report; ConnectionTimeouts timeouts; const RemoteHostFilter & remote_host_filter; unsigned int s3_max_redirects; diff --git a/src/IO/S3/tests/gtest_aws_s3_client.cpp b/src/IO/S3/tests/gtest_aws_s3_client.cpp index 5731e9061d6..6d589bcedd5 100644 --- a/src/IO/S3/tests/gtest_aws_s3_client.cpp +++ b/src/IO/S3/tests/gtest_aws_s3_client.cpp @@ -26,9 +26,19 @@ #include #include #include +#include #include "TestPocoHTTPServer.h" +/* + * When all tests are executed together, `Context::getGlobalContextInstance()` is not null. Global context is used by + * ProxyResolvers to get proxy configuration (used by S3 clients). If global context does not have a valid ConfigRef, it relies on + * Poco::Util::Application::instance() to grab the config. However, at this point, the application is not yet initialized and + * `Poco::Util::Application::instance()` returns nullptr. This causes the test to fail. To fix this, we create a dummy application that takes + * care of initialization. + * */ +[[maybe_unused]] static Poco::Util::ServerApplication app; + class NoRetryStrategy : public Aws::Client::StandardRetryStrategy { @@ -125,7 +135,8 @@ void testServerSideEncryption( enable_s3_requests_logging, /* for_disk_s3 = */ false, /* get_request_throttler = */ {}, - /* put_request_throttler = */ {} + /* put_request_throttler = */ {}, + uri.uri.getScheme() ); client_configuration.endpointOverride = uri.endpoint; diff --git a/src/IO/WriteBufferFromHTTP.cpp b/src/IO/WriteBufferFromHTTP.cpp index 355c42a23c9..056b965266e 100644 --- a/src/IO/WriteBufferFromHTTP.cpp +++ b/src/IO/WriteBufferFromHTTP.cpp @@ -13,9 +13,10 @@ WriteBufferFromHTTP::WriteBufferFromHTTP( const std::string & content_encoding, const HTTPHeaderEntries & additional_headers, const ConnectionTimeouts & timeouts, - size_t buffer_size_) + size_t buffer_size_, + Poco::Net::HTTPClientSession::ProxyConfig proxy_configuration) : WriteBufferFromOStream(buffer_size_) - , session{makeHTTPSession(uri, timeouts)} + , session{makeHTTPSession(uri, timeouts, proxy_configuration)} , request{method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1} { request.setHost(uri.getHost()); diff --git a/src/IO/WriteBufferFromHTTP.h b/src/IO/WriteBufferFromHTTP.h index ce5020dfa78..65dc10213dc 100644 --- a/src/IO/WriteBufferFromHTTP.h +++ b/src/IO/WriteBufferFromHTTP.h @@ -25,7 +25,8 @@ public: const std::string & content_encoding = "", const HTTPHeaderEntries & additional_headers = {}, const ConnectionTimeouts & timeouts = {}, - size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE); + size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, + Poco::Net::HTTPClientSession::ProxyConfig proxy_configuration = {}); private: /// Receives response from the server after sending all data. diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 135d0e441c8..81fb640ac2b 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1162,7 +1162,8 @@ void StorageS3::Configuration::connect(ContextPtr context) context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging, /* for_disk_s3 = */ false, request_settings.get_request_throttler, - request_settings.put_request_throttler); + request_settings.put_request_throttler, + url.uri.getScheme()); client_configuration.endpointOverride = url.endpoint; client_configuration.maxConnections = static_cast(request_settings.max_connections); diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index a76b8688a3e..ea03f43040a 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -161,6 +162,26 @@ namespace { return parseRemoteDescription(uri, 0, uri.size(), '|', max_addresses); } + + auto proxyConfigurationToPocoProxyConfiguration(const ProxyConfiguration & proxy_configuration) + { + Poco::Net::HTTPClientSession::ProxyConfig poco_proxy_config; + + poco_proxy_config.host = proxy_configuration.host; + poco_proxy_config.port = proxy_configuration.port; + poco_proxy_config.protocol = ProxyConfiguration::protocolToString(proxy_configuration.protocol); + + return poco_proxy_config; + } + + auto getProxyConfiguration(const std::string & protocol_string) + { + auto protocol = protocol_string == "https" ? ProxyConfigurationResolver::Protocol::HTTPS + : ProxyConfigurationResolver::Protocol::HTTP; + auto proxy_config = ProxyConfigurationResolverProvider::get(protocol)->resolve(); + + return proxyConfigurationToPocoProxyConfiguration(proxy_config); + } } class StorageURLSource::DisclosedGlobIterator::Impl @@ -402,6 +423,8 @@ std::pair> StorageURLSource: const auto settings = context->getSettings(); + auto proxy_config = getProxyConfiguration(http_method); + try { auto res = std::make_unique( @@ -417,7 +440,9 @@ std::pair> StorageURLSource: &context->getRemoteHostFilter(), delay_initialization, /* use_external_buffer */ false, - /* skip_url_not_found_error */ skip_url_not_found_error); + /* skip_url_not_found_error */ skip_url_not_found_error, + /* file_info */ std::nullopt, + proxy_config); if (context->getSettingsRef().engine_url_skip_empty_files && res->eof() && option != std::prev(end)) { @@ -464,10 +489,17 @@ StorageURLSink::StorageURLSink( std::string content_type = FormatFactory::instance().getContentType(format, context, format_settings); std::string content_encoding = toContentEncodingName(compression_method); + auto proxy_config = getProxyConfiguration(http_method); + + auto write_buffer = std::make_unique( + Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts, DBMS_DEFAULT_BUFFER_SIZE, proxy_config + ); + write_buf = wrapWriteBufferWithCompressionMethod( - std::make_unique(Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts), + std::move(write_buffer), compression_method, - 3); + 3 + ); writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, format_settings); } @@ -948,8 +980,12 @@ std::optional IStorageURLBase::getLastModificationTime( try { + auto uri = Poco::URI(url); + + auto proxy_config = getProxyConfiguration(uri.getScheme()); + ReadWriteBufferFromHTTP buf( - Poco::URI(url), + uri, Poco::Net::HTTPRequest::HTTP_GET, {}, getHTTPTimeouts(context), @@ -961,7 +997,9 @@ std::optional IStorageURLBase::getLastModificationTime( &context->getRemoteHostFilter(), true, false, - false); + false, + std::nullopt, + proxy_config); return buf.getLastModificationTime(); } diff --git a/tests/integration/helpers/s3_url_proxy_tests_util.py b/tests/integration/helpers/s3_url_proxy_tests_util.py new file mode 100644 index 00000000000..f86620ddcea --- /dev/null +++ b/tests/integration/helpers/s3_url_proxy_tests_util.py @@ -0,0 +1,88 @@ +import os +import time + + +def check_proxy_logs( + cluster, proxy_instance, protocol, bucket, http_methods={"POST", "PUT", "GET"} +): + for i in range(10): + logs = cluster.get_container_logs(proxy_instance) + # Check with retry that all possible interactions with Minio are present + for http_method in http_methods: + if ( + logs.find(http_method + f" {protocol}://minio1:9001/root/data/{bucket}") + >= 0 + ): + return + time.sleep(1) + else: + assert False, f"{http_methods} method not found in logs of {proxy_instance}" + + +def wait_resolver(cluster): + for i in range(10): + response = cluster.exec_in_container( + cluster.get_container_id("resolver"), + [ + "curl", + "-s", + f"http://resolver:8080/hostname", + ], + nothrow=True, + ) + if response == "proxy1": + return + time.sleep(i) + else: + assert False, "Resolver is not up" + + +# Runs simple proxy resolver in python env container. +def run_resolver(cluster, current_dir): + container_id = cluster.get_container_id("resolver") + cluster.copy_file_to_container( + container_id, + os.path.join(current_dir, "proxy-resolver", "resolver.py"), + "resolver.py", + ) + cluster.exec_in_container(container_id, ["python", "resolver.py"], detach=True) + + wait_resolver(cluster) + + +def build_s3_endpoint(protocol, bucket): + return f"{protocol}://minio1:9001/root/data/{bucket}/test.csv" + + +def perform_simple_queries(node, minio_endpoint): + node.query( + f""" + INSERT INTO FUNCTION + s3('{minio_endpoint}', 'minio', 'minio123', 'CSV', 'key String, value String') + VALUES ('color','red'),('size','10') + """ + ) + + assert ( + node.query( + f"SELECT * FROM s3('{minio_endpoint}', 'minio', 'minio123', 'CSV') FORMAT Values" + ) + == "('color','red'),('size','10')" + ) + + assert ( + node.query( + f"SELECT * FROM s3('{minio_endpoint}', 'minio', 'minio123', 'CSV') FORMAT Values" + ) + == "('color','red'),('size','10')" + ) + + +def simple_test(cluster, proxies, protocol, bucket): + minio_endpoint = build_s3_endpoint(protocol, bucket) + node = cluster.instances[f"{bucket}"] + + perform_simple_queries(node, minio_endpoint) + + for proxy in proxies: + check_proxy_logs(cluster, proxy, protocol, bucket) diff --git a/tests/integration/test_s3_with_proxy/__init__.py b/tests/integration/test_s3_storage_conf_new_proxy/__init__.py similarity index 100% rename from tests/integration/test_s3_with_proxy/__init__.py rename to tests/integration/test_s3_storage_conf_new_proxy/__init__.py diff --git a/tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/proxy_list.xml b/tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/proxy_list.xml new file mode 100644 index 00000000000..af5687d88ac --- /dev/null +++ b/tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/proxy_list.xml @@ -0,0 +1,8 @@ + + + + http://proxy1 + http://proxy2 + + + \ No newline at end of file diff --git a/tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/storage_conf.xml b/tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/storage_conf.xml new file mode 100644 index 00000000000..94ac83b32ac --- /dev/null +++ b/tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/storage_conf.xml @@ -0,0 +1,21 @@ + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + + + + +
+ s3 +
+
+
+
+
+
diff --git a/tests/integration/test_s3_with_proxy/configs/config.d/users.xml b/tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/users.xml similarity index 100% rename from tests/integration/test_s3_with_proxy/configs/config.d/users.xml rename to tests/integration/test_s3_storage_conf_new_proxy/configs/config.d/users.xml diff --git a/tests/integration/test_s3_storage_conf_new_proxy/configs/config.xml b/tests/integration/test_s3_storage_conf_new_proxy/configs/config.xml new file mode 100644 index 00000000000..f4be5ab6b7c --- /dev/null +++ b/tests/integration/test_s3_storage_conf_new_proxy/configs/config.xml @@ -0,0 +1,7 @@ + + 9000 + 127.0.0.1 + 500 + ./clickhouse/ + users.xml + diff --git a/tests/integration/test_s3_with_proxy/proxy-resolver/resolver.py b/tests/integration/test_s3_storage_conf_new_proxy/proxy-resolver/resolver.py similarity index 100% rename from tests/integration/test_s3_with_proxy/proxy-resolver/resolver.py rename to tests/integration/test_s3_storage_conf_new_proxy/proxy-resolver/resolver.py diff --git a/tests/integration/test_s3_storage_conf_new_proxy/test.py b/tests/integration/test_s3_storage_conf_new_proxy/test.py new file mode 100644 index 00000000000..c98eb05a217 --- /dev/null +++ b/tests/integration/test_s3_storage_conf_new_proxy/test.py @@ -0,0 +1,66 @@ +import logging +import time + +import pytest +from helpers.cluster import ClickHouseCluster + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node", + main_configs=[ + "configs/config.d/storage_conf.xml", + "configs/config.d/proxy_list.xml", + ], + with_minio=True, + ) + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + yield cluster + finally: + cluster.shutdown() + + +def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET"}): + for i in range(10): + logs = cluster.get_container_logs(proxy_instance) + # Check with retry that all possible interactions with Minio are present + for http_method in http_methods: + if logs.find(http_method + " http://minio1") >= 0: + return + time.sleep(1) + else: + assert False, f"{http_methods} method not found in logs of {proxy_instance}" + + +@pytest.mark.parametrize("policy", ["s3"]) +def test_s3_with_proxy_list(cluster, policy): + node = cluster.instances["node"] + + node.query( + """ + CREATE TABLE s3_test ( + id Int64, + data String + ) ENGINE=MergeTree() + ORDER BY id + SETTINGS storage_policy='{}' + """.format( + policy + ) + ) + node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')") + assert ( + node.query("SELECT * FROM s3_test order by id FORMAT Values") + == "(0,'data'),(1,'data')" + ) + + node.query("DROP TABLE IF EXISTS s3_test SYNC") + + for proxy in ["proxy1", "proxy2"]: + check_proxy_logs(cluster, proxy, ["PUT", "GET"]) diff --git a/tests/integration/test_s3_storage_conf_proxy/__init__.py b/tests/integration/test_s3_storage_conf_proxy/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_s3_with_proxy/configs/config.d/storage_conf.xml b/tests/integration/test_s3_storage_conf_proxy/configs/config.d/storage_conf.xml similarity index 100% rename from tests/integration/test_s3_with_proxy/configs/config.d/storage_conf.xml rename to tests/integration/test_s3_storage_conf_proxy/configs/config.d/storage_conf.xml diff --git a/tests/integration/test_s3_storage_conf_proxy/configs/config.d/users.xml b/tests/integration/test_s3_storage_conf_proxy/configs/config.d/users.xml new file mode 100644 index 00000000000..0011583a68c --- /dev/null +++ b/tests/integration/test_s3_storage_conf_proxy/configs/config.d/users.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/tests/integration/test_s3_storage_conf_proxy/configs/config.xml b/tests/integration/test_s3_storage_conf_proxy/configs/config.xml new file mode 100644 index 00000000000..f4be5ab6b7c --- /dev/null +++ b/tests/integration/test_s3_storage_conf_proxy/configs/config.xml @@ -0,0 +1,7 @@ + + 9000 + 127.0.0.1 + 500 + ./clickhouse/ + users.xml + diff --git a/tests/integration/test_s3_storage_conf_proxy/proxy-resolver/resolver.py b/tests/integration/test_s3_storage_conf_proxy/proxy-resolver/resolver.py new file mode 100644 index 00000000000..eaea4c1dab2 --- /dev/null +++ b/tests/integration/test_s3_storage_conf_proxy/proxy-resolver/resolver.py @@ -0,0 +1,14 @@ +import random + +import bottle + + +@bottle.route("/hostname") +def index(): + if random.randrange(2) == 0: + return "proxy1" + else: + return "proxy2" + + +bottle.run(host="0.0.0.0", port=8080) diff --git a/tests/integration/test_s3_with_proxy/test.py b/tests/integration/test_s3_storage_conf_proxy/test.py similarity index 76% rename from tests/integration/test_s3_with_proxy/test.py rename to tests/integration/test_s3_storage_conf_proxy/test.py index e5624d4e056..6cf612f8259 100644 --- a/tests/integration/test_s3_with_proxy/test.py +++ b/tests/integration/test_s3_storage_conf_proxy/test.py @@ -4,18 +4,7 @@ import time import pytest from helpers.cluster import ClickHouseCluster - - -# Runs simple proxy resolver in python env container. -def run_resolver(cluster): - container_id = cluster.get_container_id("resolver") - current_dir = os.path.dirname(__file__) - cluster.copy_file_to_container( - container_id, - os.path.join(current_dir, "proxy-resolver", "resolver.py"), - "resolver.py", - ) - cluster.exec_in_container(container_id, ["python", "resolver.py"], detach=True) +import helpers.s3_url_proxy_tests_util as proxy_util @pytest.fixture(scope="module") @@ -29,7 +18,7 @@ def cluster(): cluster.start() logging.info("Cluster started") - run_resolver(cluster) + proxy_util.run_resolver(cluster, os.path.dirname(__file__)) logging.info("Proxy resolver started") yield cluster @@ -46,7 +35,7 @@ def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET" return time.sleep(1) else: - assert False, "http method not found in logs" + assert False, f"{http_methods} method not found in logs of {proxy_instance}" @pytest.mark.parametrize("policy", ["s3", "s3_with_resolver"]) @@ -65,7 +54,6 @@ def test_s3_with_proxy_list(cluster, policy): policy ) ) - node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')") assert ( node.query("SELECT * FROM s3_test order by id FORMAT Values") diff --git a/tests/integration/test_s3_table_function_with_http_proxy/__init__.py b/tests/integration/test_s3_table_function_with_http_proxy/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_s3_table_function_with_http_proxy/configs/config.d/proxy_list.xml b/tests/integration/test_s3_table_function_with_http_proxy/configs/config.d/proxy_list.xml new file mode 100644 index 00000000000..af5687d88ac --- /dev/null +++ b/tests/integration/test_s3_table_function_with_http_proxy/configs/config.d/proxy_list.xml @@ -0,0 +1,8 @@ + + + + http://proxy1 + http://proxy2 + + + \ No newline at end of file diff --git a/tests/integration/test_s3_table_function_with_http_proxy/configs/config.d/proxy_remote.xml b/tests/integration/test_s3_table_function_with_http_proxy/configs/config.d/proxy_remote.xml new file mode 100644 index 00000000000..30d99605458 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_http_proxy/configs/config.d/proxy_remote.xml @@ -0,0 +1,15 @@ + + + + + http://resolver:8080/hostname + http + 80 + 10 + + + diff --git a/tests/integration/test_s3_table_function_with_http_proxy/proxy-resolver/resolver.py b/tests/integration/test_s3_table_function_with_http_proxy/proxy-resolver/resolver.py new file mode 100644 index 00000000000..8c7611303b8 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_http_proxy/proxy-resolver/resolver.py @@ -0,0 +1,11 @@ +import random + +import bottle + + +@bottle.route("/hostname") +def index(): + return "proxy1" + + +bottle.run(host="0.0.0.0", port=8080) diff --git a/tests/integration/test_s3_table_function_with_http_proxy/test.py b/tests/integration/test_s3_table_function_with_http_proxy/test.py new file mode 100644 index 00000000000..8751b8f3e99 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_http_proxy/test.py @@ -0,0 +1,59 @@ +import logging +import helpers.s3_url_proxy_tests_util as proxy_util +import os + +import pytest +from helpers.cluster import ClickHouseCluster + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + + cluster.add_instance( + "remote_proxy_node", + main_configs=[ + "configs/config.d/proxy_remote.xml", + ], + with_minio=True, + ) + + cluster.add_instance( + "proxy_list_node", + main_configs=[ + "configs/config.d/proxy_list.xml", + ], + with_minio=True, + ) + + cluster.add_instance( + "env_node", + with_minio=True, + env_variables={ + "http_proxy": "http://proxy1", + }, + ) + + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + proxy_util.run_resolver(cluster, os.path.dirname(__file__)) + logging.info("Proxy resolver started") + + yield cluster + finally: + cluster.shutdown() + + +def test_s3_with_http_proxy_list(cluster): + proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "http", "proxy_list_node") + + +def test_s3_with_http_remote_proxy(cluster): + proxy_util.simple_test(cluster, ["proxy1"], "http", "remote_proxy_node") + + +def test_s3_with_http_env_proxy(cluster): + proxy_util.simple_test(cluster, ["proxy1"], "http", "env_node") diff --git a/tests/integration/test_s3_table_function_with_https_proxy/__init__.py b/tests/integration/test_s3_table_function_with_https_proxy/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/proxy_list.xml b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/proxy_list.xml new file mode 100644 index 00000000000..9f2724d78b4 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/proxy_list.xml @@ -0,0 +1,12 @@ + + + + http://proxy1 + http://proxy2 + + + https://proxy1 + https://proxy2 + + + \ No newline at end of file diff --git a/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/proxy_remote.xml b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/proxy_remote.xml new file mode 100644 index 00000000000..c0f5975224d --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/proxy_remote.xml @@ -0,0 +1,22 @@ + + + + + http://resolver:8080/hostname + http + 80 + 10 + + + + http://resolver:8080/hostname + https + 443 + 10 + + + diff --git a/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/ssl.xml b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/ssl.xml new file mode 100644 index 00000000000..d4cea955c68 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/ssl.xml @@ -0,0 +1,11 @@ + + + + true + none + + AcceptCertificateHandler + + + + diff --git a/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/users.xml b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/users.xml new file mode 100644 index 00000000000..0011583a68c --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.d/users.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/tests/integration/test_s3_with_proxy/configs/config.xml b/tests/integration/test_s3_table_function_with_https_proxy/configs/config.xml similarity index 100% rename from tests/integration/test_s3_with_proxy/configs/config.xml rename to tests/integration/test_s3_table_function_with_https_proxy/configs/config.xml diff --git a/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/CAs/public.crt b/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/CAs/public.crt new file mode 100644 index 00000000000..7f87261aea8 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/CAs/public.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC+TCCAeGgAwIBAgIQfF4j70ZdR/W3XlFJq5iZgDANBgkqhkiG9w0BAQsFADAS +MRAwDgYDVQQKEwdBY21lIENvMB4XDTIwMDcwOTE1MTQ1M1oXDTIxMDcwOTE1MTQ1 +M1owEjEQMA4GA1UEChMHQWNtZSBDbzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC +AQoCggEBAM4i2tOlbbDxcvckVK/Zms95n2ipr7dZ0qToSf8qmF5d2EH6mqC0Vv2d +MJ+8JhQEKBh8AvUjmSqjd8tWmLJcqA84Gc8s8stB565wwkaMBvMExKlO+PQtynRd +xZjQVnj16hB0ZP4JHeVUOqMQa7uPQZQp6kqdkJ3u84EhRmU8fCCtUPOJIYHcfx7P +ScYfmJCpmqxrfWP18XcyYlhoCTm/nV+XT+XfUGwc6Sok5pCX5C70PiQ5MrEvYDIC +Q3iRNi2Lj4pTG8GUSwAcKLB08o7mxHvR1MGDGohtGnSAhdniK9aljNmBQfNIErFI +3529YDMW/qwRKSEkJpMy7r8RkfYamUsCAwEAAaNLMEkwDgYDVR0PAQH/BAQDAgKk +MBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEwEB/wQFMAMBAf8wEQYDVR0RBAow +CIIGbWluaW8xMA0GCSqGSIb3DQEBCwUAA4IBAQDAlDKuJfQHzsBtFioNnf9VS+LA +m/aUG7ngl0Orynef45Kl21l1ToM0feKk1UnEN8crwE0wjZttby84sYaq0us7mnbl +CnFsvphNqk5jxMbSs/8Vx4TVEimyO7r5JeG4J9lEnAu2hKK5ZlwPzj7G8bL4fOvk +OGiiP5r0K3wTVU/Y96MmDUaJwBNiyp7WtsBRzkteSPQJDC98gUCYeYsIFokUs3gz +ILOAbGQBLKUn9kmYc+/LLNha0nsC0eQGmLaJgIYfele63c6KkklQ3ePjRZ71JfmN +TulovRrwUf0J4hYcIgC1URZbClsnQDOBFCY6Lm8eI+IGNWWU4I9WGoJ1Lkvk +-----END CERTIFICATE----- diff --git a/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/private.key b/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/private.key new file mode 100644 index 00000000000..28a0f4bfde7 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/private.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC9ORgaBCx42ejp +9PSjc0uvwH/hTB6yZvZB4S+wxbzzfeKomX/JBcFHmGCIJJVjVV0rafv3vw+9f9u4 +wrZpN4HZKnVyz3mBXEA1WDvLTLV8n8zVyso1qbnfF9Fa8wnk89b0xGWyM7jie7/c +TIGMrgm7hIPaM2zDzFwIfIAqZ1AexC4vADIffF9rcFLLjNHuv1uAc32jdfQEPluv +mBMzGkz254+MabxZWIZjkYn70kNSZDoyFmMGafBtkRTUPNq2+fGv/eLJ9Lxm3153 +Ja0sCyzLlEo9+/z4ERqM5zwWre4vcwfO63c5pcSCzGw84teTpmDwSyiSR70TYJdt +BGQqZvLZAgMBAAECggEANe8oJ4I5CtlRwh3H/S7Hy/iaeqUvuroORwjghwpVqTGg +gV3/RlUVmkqceTG0QvP58n3rC9qxqdnfzvHw/FyN7lBj2a25fF3HD21u3aunrzX9 +NJLwwAr4p9YqHjpX/6JhCrNQKVMEx8luDmTgKDETJRfIXVF7FvQQ53pVLcD03U+g +MgN61HBzfT5L0TLHoiKNQbVi+Wm1gw3zvb/a9Z1rULRZfIuKGM0bNNqRZt4rUUAV +QicklDR0Qv59jhr5Y/zjinKkqF8qudvUkaNT2JH1DLfXiAhuC0OQugMjYzNntQB4 +hMhkqARnjuk/WPMvnXivnqx9o69BL5wyXIj3vD4fgQKBgQDVKaXAZJ5bo3VfcpLm +cyjtUuOzAxLU1bVGI0Hm1ARqeGVxSTypZLSX8xFi2n5Bvbgh/Y60aEac/1uKoXA9 +gej1MT4hKpXyagrARx97E8zk5nf88kVxkiKUrifMjP2lDzHIYhdKk9R3SiV6gWvA +FoJtjBwFhJ6uWUPyry4nqFSENQKBgQDjP9k6CTZF0EnDqbADiQr7VKpebqhtLWRD +U0bQh/l57VrWqGksVOlivIJChP49q1H+hQ1YgfKIEDag8JJnf/inUSpVsw1ljAjv +knqNzn0Gdd9lTsiNGgqlCjhmWedkh4eO8uau479TwQc6gB4PQdLAFynQtt8Kk45P +GxdpRx4AlQKBgQCgxUGbYwhBC37aF1sObqrenBbajCXm2qxXEv6Ab0ZJWzb/g4I6 +LJc8x3pEeZCiWsoG8Otxy/f+L2bGn049Rb8DNzmp4Cmp5SrorHvk4yE1P1IeOEgC +CXsFcnjYATrJBDXC8aCpgefMdOLhi71N6mxC3VrBGq5nxzHFVzTTelUMRQKBgQDa +yekhiCb5liy+tcuhy7qH+Z7BpjaATrh+XVoLgS5+5jeT/basmN/OUQH0e0iwJRaf +Poh30zynJT0DPDsobLwAkxN4SRg30Vf1GAjoKIqUwr2fMvfBafYfqbRdTmeKkTXB +OjlA3kKhp3GHMDxAojX+/Q4kRTx+WUwk+0dR88d99QKBgEiYrkSLjKXUFllDmVyp +HtlYKZiq5c33DA06SA2uVOprCdTbnbvP4WrgUsLGvqBcaPEd06fGGbvJWwUdnkXM +HNAkqSeUe5ueovidtoPdF+aPyxdGg3Z8551xOoHZFYrvgdZ4YMPcJrwQQsvWCcYP +GDnSoD8Xjd2LmekTpDBt5ZVz +-----END PRIVATE KEY----- diff --git a/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/public.crt b/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/public.crt new file mode 100644 index 00000000000..0d0992c8f5b --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/minio_certs/public.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDBTCCAe2gAwIBAgIRANb2pr4HgR8YFwKNJMUSWiIwDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAeFw0yMDA3MDkxODE1MDBaFw0yMTA3MDkxODE1 +MDBaMBIxEDAOBgNVBAoTB0FjbWUgQ28wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQC9ORgaBCx42ejp9PSjc0uvwH/hTB6yZvZB4S+wxbzzfeKomX/JBcFH +mGCIJJVjVV0rafv3vw+9f9u4wrZpN4HZKnVyz3mBXEA1WDvLTLV8n8zVyso1qbnf +F9Fa8wnk89b0xGWyM7jie7/cTIGMrgm7hIPaM2zDzFwIfIAqZ1AexC4vADIffF9r +cFLLjNHuv1uAc32jdfQEPluvmBMzGkz254+MabxZWIZjkYn70kNSZDoyFmMGafBt +kRTUPNq2+fGv/eLJ9Lxm3153Ja0sCyzLlEo9+/z4ERqM5zwWre4vcwfO63c5pcSC +zGw84teTpmDwSyiSR70TYJdtBGQqZvLZAgMBAAGjVjBUMA4GA1UdDwEB/wQEAwIC +pDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MBwGA1UdEQQV +MBOCBm1pbmlvMYIJbG9jYWxob3N0MA0GCSqGSIb3DQEBCwUAA4IBAQAKU2LhvFFz +RFfUibt/WTj3rtUfKEBrQuUOYt2A8MTbC8pyEu+UJASTzunluUFze5zchEm1s3pZ +YRLcNwbJqLE6CzUxQ9b2iUhaeWuKrx4ZoPkY0uGiaXM/iKfVKTuNmhF2Sf/P4xUE +Pt19yQjpIhcicWQc37BBQFvnvy+n5wgHa/pgl1+QUvAa/fwYhF9S28xRLESzZepm +NMYysopV+YMaxcFa9SH44toXtXnvRWwVdEorlq1W3/AiJg8hDPzSa9UXLMjA968J +ONtn3qvwac9Ot53+QsXJdsMmDZLWGCi6I1w0ZQetpr/0ubaA1F3GdK9eB/S0thqU +l2VUgn3c/kKS +-----END CERTIFICATE----- diff --git a/tests/integration/test_s3_table_function_with_https_proxy/proxy-resolver/resolver.py b/tests/integration/test_s3_table_function_with_https_proxy/proxy-resolver/resolver.py new file mode 100644 index 00000000000..8c7611303b8 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/proxy-resolver/resolver.py @@ -0,0 +1,11 @@ +import random + +import bottle + + +@bottle.route("/hostname") +def index(): + return "proxy1" + + +bottle.run(host="0.0.0.0", port=8080) diff --git a/tests/integration/test_s3_table_function_with_https_proxy/test.py b/tests/integration/test_s3_table_function_with_https_proxy/test.py new file mode 100644 index 00000000000..a498410a4d4 --- /dev/null +++ b/tests/integration/test_s3_table_function_with_https_proxy/test.py @@ -0,0 +1,67 @@ +import logging +import helpers.s3_url_proxy_tests_util as proxy_util +import os + +import pytest +from helpers.cluster import ClickHouseCluster + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + + # minio_certs_dir is set only once and used by all instances + + cluster.add_instance( + "remote_proxy_node", + main_configs=[ + "configs/config.d/proxy_remote.xml", + "configs/config.d/ssl.xml", + ], + with_minio=True, + minio_certs_dir="minio_certs", + ) + + cluster.add_instance( + "proxy_list_node", + main_configs=[ + "configs/config.d/proxy_list.xml", + "configs/config.d/ssl.xml", + ], + with_minio=True, + ) + + cluster.add_instance( + "env_node", + main_configs=[ + "configs/config.d/ssl.xml", + ], + with_minio=True, + env_variables={ + "https_proxy": "https://proxy1", + }, + ) + + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + proxy_util.run_resolver(cluster, os.path.dirname(__file__)) + logging.info("Proxy resolver started") + + yield cluster + finally: + cluster.shutdown() + + +def test_s3_with_https_proxy_list(cluster): + proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "https", "proxy_list_node") + + +def test_s3_with_https_remote_proxy(cluster): + proxy_util.simple_test(cluster, ["proxy1"], "https", "remote_proxy_node") + + +def test_s3_with_https_env_proxy(cluster): + proxy_util.simple_test(cluster, ["proxy1"], "https", "env_node") diff --git a/tests/integration/test_storage_url_with_proxy/__init__.py b/tests/integration/test_storage_url_with_proxy/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_storage_url_with_proxy/configs/config.d/proxy_list.xml b/tests/integration/test_storage_url_with_proxy/configs/config.d/proxy_list.xml new file mode 100644 index 00000000000..ff207e7166c --- /dev/null +++ b/tests/integration/test_storage_url_with_proxy/configs/config.d/proxy_list.xml @@ -0,0 +1,7 @@ + + + + http://proxy1 + + + \ No newline at end of file diff --git a/tests/integration/test_storage_url_with_proxy/test.py b/tests/integration/test_storage_url_with_proxy/test.py new file mode 100644 index 00000000000..107aa426836 --- /dev/null +++ b/tests/integration/test_storage_url_with_proxy/test.py @@ -0,0 +1,84 @@ +import logging +import time +from datetime import datetime +import hmac +import hashlib +import base64 + +import pytest +from helpers.cluster import ClickHouseCluster + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + + cluster.add_instance( + "proxy_list_node", + main_configs=["configs/config.d/proxy_list.xml"], + with_minio=True, + ) + + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + yield cluster + finally: + cluster.shutdown() + + +def check_proxy_logs(cluster, proxy_instance, http_methods): + minio_ip = cluster.get_instance_ip("minio1") + for i in range(10): + logs = cluster.get_container_logs(proxy_instance) + # Check with retry that all possible interactions with Minio are present + for http_method in http_methods: + method_with_domain = http_method + " http://minio1" + method_with_ip = http_method + f" http://{minio_ip}" + + logging.info(f"Method with ip: {method_with_ip}") + + has_get_minio_logs = ( + logs.find(method_with_domain) >= 0 or logs.find(method_with_ip) >= 0 + ) + if has_get_minio_logs: + return + time.sleep(1) + else: + assert False, "http method not found in logs" + + +def test_s3_with_proxy_list(cluster): + node = cluster.instances["proxy_list_node"] + + # insert into function url uses POST and minio expects PUT + node.query( + """ + INSERT INTO FUNCTION + s3('http://minio1:9001/root/data/ch-proxy-test/test.csv', 'minio', 'minio123', 'CSV', 'key String, value String') + VALUES ('color','red'),('size','10') + """ + ) + + content_type = "application/zstd" + date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S +0000") + resource = "/root/data/ch-proxy-test/test.csv" + get_sig_string = f"GET\n\n{content_type}\n{date}\n{resource}" + password = "minio123" + + get_digest = hmac.new( + password.encode("utf-8"), get_sig_string.encode("utf-8"), hashlib.sha1 + ).digest() + get_signature = base64.b64encode(get_digest).decode("utf-8") + assert ( + node.query( + "SELECT * FROM url('http://minio1:9001/root/data/ch-proxy-test/test.csv', 'CSV', 'a String, b String'," + f"headers('Host'='minio1', 'Date'= '{date}', 'Content-Type'='{content_type}'," + f"'Authorization'='AWS minio:{get_signature}')) FORMAT Values" + ) + == "('color','red'),('size','10')" + ) + + check_proxy_logs(cluster, "proxy1", ["GET"])