Merge branch 'master' into ci_py_small_refactoring

This commit is contained in:
Max K 2024-06-04 19:30:37 +02:00
commit 443d06328f
25 changed files with 332 additions and 185 deletions

View File

@ -43,12 +43,13 @@ namespace
endpoint,
proxy_scheme,
proxy_port,
cache_ttl
std::chrono::seconds {cache_ttl}
};
return std::make_shared<RemoteProxyConfigurationResolver>(
server_configuration,
request_protocol,
std::make_shared<RemoteProxyHostFetcherImpl>(),
isTunnelingDisabledForHTTPSRequestsOverHTTPProxy(configuration));
}

View File

@ -6,22 +6,47 @@
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Common/logger_useful.h>
#include <Common/DNSResolver.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
}
std::string RemoteProxyHostFetcherImpl::fetch(const Poco::URI & endpoint, const ConnectionTimeouts & timeouts)
{
auto request = Poco::Net::HTTPRequest(Poco::Net::HTTPRequest::HTTP_GET, endpoint.getPath(), Poco::Net::HTTPRequest::HTTP_1_1);
auto session = makeHTTPSession(HTTPConnectionGroupType::HTTP, endpoint, timeouts);
session->sendRequest(request);
Poco::Net::HTTPResponse response;
auto & response_body_stream = session->receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw HTTPException(
ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER,
endpoint.toString(),
response.getStatus(),
response.getReason(),
"");
std::string proxy_host;
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
return proxy_host;
}
RemoteProxyConfigurationResolver::RemoteProxyConfigurationResolver(
const RemoteServerConfiguration & remote_server_configuration_,
Protocol request_protocol_,
std::shared_ptr<RemoteProxyHostFetcher> fetcher_,
bool disable_tunneling_for_https_requests_over_http_proxy_
)
: ProxyConfigurationResolver(request_protocol_, disable_tunneling_for_https_requests_over_http_proxy_), remote_server_configuration(remote_server_configuration_)
: ProxyConfigurationResolver(request_protocol_, disable_tunneling_for_https_requests_over_http_proxy_),
remote_server_configuration(remote_server_configuration_), fetcher(fetcher_)
{
}
@ -29,9 +54,7 @@ ProxyConfiguration RemoteProxyConfigurationResolver::resolve()
{
auto logger = getLogger("RemoteProxyConfigurationResolver");
auto & [endpoint, proxy_protocol, proxy_port, cache_ttl_] = remote_server_configuration;
LOG_DEBUG(logger, "Obtain proxy using resolver: {}", endpoint.toString());
auto & [endpoint, proxy_protocol_string, proxy_port, cache_ttl] = remote_server_configuration;
std::lock_guard lock(cache_mutex);
@ -55,66 +78,26 @@ ProxyConfiguration RemoteProxyConfigurationResolver::resolve()
.withSendTimeout(1)
.withReceiveTimeout(1);
try
{
/// It should be just empty GET request.
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, endpoint.getPath(), Poco::Net::HTTPRequest::HTTP_1_1);
const auto proxy_host = fetcher->fetch(endpoint, timeouts);
const auto & host = endpoint.getHost();
auto resolved_hosts = DNSResolver::instance().resolveHostAll(host);
LOG_DEBUG(logger, "Use proxy: {}://{}:{}", proxy_protocol_string, proxy_host, proxy_port);
HTTPSessionPtr session;
auto proxy_protocol = ProxyConfiguration::protocolFromString(proxy_protocol_string);
for (size_t i = 0; i < resolved_hosts.size(); ++i)
{
auto resolved_endpoint = endpoint;
resolved_endpoint.setHost(resolved_hosts[i].toString());
session = makeHTTPSession(HTTPConnectionGroupType::HTTP, resolved_endpoint, timeouts);
bool use_tunneling_for_https_requests_over_http_proxy = useTunneling(
request_protocol,
proxy_protocol,
disable_tunneling_for_https_requests_over_http_proxy);
try
{
session->sendRequest(request);
break;
}
catch (...)
{
if (i + 1 == resolved_hosts.size())
throw;
}
}
cached_config.protocol = proxy_protocol;
cached_config.host = proxy_host;
cached_config.port = proxy_port;
cached_config.tunneling = use_tunneling_for_https_requests_over_http_proxy;
cached_config.original_request_protocol = request_protocol;
cache_timestamp = std::chrono::system_clock::now();
cache_valid = true;
Poco::Net::HTTPResponse response;
auto & response_body_stream = session->receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Proxy resolver returned not OK status: {}", response.getReason());
String proxy_host;
/// Read proxy host as string from response body.
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
LOG_DEBUG(logger, "Use proxy: {}://{}:{}", proxy_protocol, proxy_host, proxy_port);
bool use_tunneling_for_https_requests_over_http_proxy = useTunneling(
request_protocol,
cached_config.protocol,
disable_tunneling_for_https_requests_over_http_proxy);
cached_config.protocol = ProxyConfiguration::protocolFromString(proxy_protocol);
cached_config.host = proxy_host;
cached_config.port = proxy_port;
cached_config.tunneling = use_tunneling_for_https_requests_over_http_proxy;
cached_config.original_request_protocol = request_protocol;
cache_timestamp = std::chrono::system_clock::now();
cache_valid = true;
return cached_config;
}
catch (...)
{
tryLogCurrentException("RemoteProxyConfigurationResolver", "Failed to obtain proxy");
return {};
}
return cached_config;
}
void RemoteProxyConfigurationResolver::errorReport(const ProxyConfiguration & config)
@ -124,7 +107,7 @@ void RemoteProxyConfigurationResolver::errorReport(const ProxyConfiguration & co
std::lock_guard lock(cache_mutex);
if (!cache_ttl.count() || !cache_valid)
if (!remote_server_configuration.cache_ttl_.count() || !cache_valid)
return;
if (std::tie(cached_config.protocol, cached_config.host, cached_config.port)

View File

@ -10,6 +10,19 @@
namespace DB
{
struct ConnectionTimeouts;
struct RemoteProxyHostFetcher
{
virtual ~RemoteProxyHostFetcher() = default;
virtual std::string fetch(const Poco::URI & endpoint, const ConnectionTimeouts & timeouts) = 0;
};
struct RemoteProxyHostFetcherImpl : public RemoteProxyHostFetcher
{
std::string fetch(const Poco::URI & endpoint, const ConnectionTimeouts & timeouts) override;
};
/*
* Makes an HTTP GET request to the specified endpoint to obtain a proxy host.
* */
@ -22,13 +35,14 @@ public:
Poco::URI endpoint;
String proxy_protocol;
unsigned proxy_port;
unsigned cache_ttl_;
const std::chrono::seconds cache_ttl_;
};
RemoteProxyConfigurationResolver(
const RemoteServerConfiguration & remote_server_configuration_,
Protocol request_protocol_,
bool disable_tunneling_for_https_requests_over_http_proxy_ = true);
std::shared_ptr<RemoteProxyHostFetcher> fetcher_,
bool disable_tunneling_for_https_requests_over_http_proxy_ = false);
ProxyConfiguration resolve() override;
@ -36,11 +50,11 @@ public:
private:
RemoteServerConfiguration remote_server_configuration;
std::shared_ptr<RemoteProxyHostFetcher> fetcher;
std::mutex cache_mutex;
bool cache_valid = false;
std::chrono::time_point<std::chrono::system_clock> cache_timestamp;
const std::chrono::seconds cache_ttl{0};
ProxyConfiguration cached_config;
};

View File

@ -0,0 +1,172 @@
#include <gtest/gtest.h>
#include <Common/RemoteProxyConfigurationResolver.h>
#include <Poco/URI.h>
#include <IO/ConnectionTimeouts.h>
#include <base/sleep.h>
namespace
{
struct RemoteProxyHostFetcherMock : public DB::RemoteProxyHostFetcher
{
explicit RemoteProxyHostFetcherMock(const std::string & return_mock_) : return_mock(return_mock_) {}
std::string fetch(const Poco::URI &, const DB::ConnectionTimeouts &) override
{
fetch_count++;
return return_mock;
}
std::string return_mock;
std::size_t fetch_count {0};
};
}
namespace DB
{
TEST(RemoteProxyConfigurationResolver, HTTPOverHTTP)
{
const char * proxy_server_mock = "proxy1";
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
{
Poco::URI("not_important"),
"http",
80,
std::chrono::seconds {10}
};
RemoteProxyConfigurationResolver resolver(
remote_server_configuration,
ProxyConfiguration::Protocol::HTTP,
std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock)
);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, proxy_server_mock);
ASSERT_EQ(configuration.port, 80);
ASSERT_EQ(configuration.protocol, ProxyConfiguration::Protocol::HTTP);
ASSERT_EQ(configuration.original_request_protocol, ProxyConfiguration::Protocol::HTTP);
ASSERT_EQ(configuration.tunneling, false);
}
TEST(RemoteProxyConfigurationResolver, HTTPSOverHTTPS)
{
const char * proxy_server_mock = "proxy1";
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
{
Poco::URI("not_important"),
"https",
443,
std::chrono::seconds {10}
};
RemoteProxyConfigurationResolver resolver(
remote_server_configuration,
ProxyConfiguration::Protocol::HTTPS,
std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock)
);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, proxy_server_mock);
ASSERT_EQ(configuration.port, 443);
ASSERT_EQ(configuration.protocol, ProxyConfiguration::Protocol::HTTPS);
ASSERT_EQ(configuration.original_request_protocol, ProxyConfiguration::Protocol::HTTPS);
// tunneling should not be used, https over https.
ASSERT_EQ(configuration.tunneling, false);
}
TEST(RemoteProxyConfigurationResolver, HTTPSOverHTTP)
{
const char * proxy_server_mock = "proxy1";
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
{
Poco::URI("not_important"),
"http",
80,
std::chrono::seconds {10}
};
RemoteProxyConfigurationResolver resolver(
remote_server_configuration,
ProxyConfiguration::Protocol::HTTPS,
std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock)
);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, proxy_server_mock);
ASSERT_EQ(configuration.port, 80);
ASSERT_EQ(configuration.protocol, ProxyConfiguration::Protocol::HTTP);
ASSERT_EQ(configuration.original_request_protocol, ProxyConfiguration::Protocol::HTTPS);
// tunneling should be used, https over http.
ASSERT_EQ(configuration.tunneling, true);
}
TEST(RemoteProxyConfigurationResolver, HTTPSOverHTTPNoTunneling)
{
const char * proxy_server_mock = "proxy1";
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
{
Poco::URI("not_important"),
"http",
80,
std::chrono::seconds {10}
};
RemoteProxyConfigurationResolver resolver(
remote_server_configuration,
ProxyConfiguration::Protocol::HTTPS,
std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock),
true /* disable_tunneling_for_https_requests_over_http_proxy_ */
);
auto configuration = resolver.resolve();
ASSERT_EQ(configuration.host, proxy_server_mock);
ASSERT_EQ(configuration.port, 80);
ASSERT_EQ(configuration.protocol, ProxyConfiguration::Protocol::HTTP);
ASSERT_EQ(configuration.original_request_protocol, ProxyConfiguration::Protocol::HTTPS);
// tunneling should be used, https over http.
ASSERT_EQ(configuration.tunneling, false);
}
TEST(RemoteProxyConfigurationResolver, SimpleCacheTest)
{
const char * proxy_server_mock = "proxy1";
auto cache_ttl = 5u;
auto remote_server_configuration = RemoteProxyConfigurationResolver::RemoteServerConfiguration
{
Poco::URI("not_important"),
"http",
80,
std::chrono::seconds {cache_ttl}
};
auto fetcher_mock = std::make_shared<RemoteProxyHostFetcherMock>(proxy_server_mock);
RemoteProxyConfigurationResolver resolver(
remote_server_configuration,
ProxyConfiguration::Protocol::HTTP,
fetcher_mock
);
resolver.resolve();
resolver.resolve();
resolver.resolve();
ASSERT_EQ(fetcher_mock->fetch_count, 1u);
sleepForSeconds(cache_ttl * 2);
resolver.resolve();
ASSERT_EQ(fetcher_mock->fetch_count, 2);
}
}

View File

@ -48,7 +48,7 @@ HTTPSessionPtr makeHTTPSession(
HTTPConnectionGroupType group,
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
ProxyConfiguration proxy_configuration)
const ProxyConfiguration & proxy_configuration)
{
auto connection_pool = HTTPConnectionPools::instance().getPool(group, uri, proxy_configuration);
return connection_pool->getConnection(timeouts);

View File

@ -61,7 +61,7 @@ HTTPSessionPtr makeHTTPSession(
HTTPConnectionGroupType group,
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
ProxyConfiguration proxy_config = {}
const ProxyConfiguration & proxy_config = {}
);
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);

View File

@ -305,8 +305,7 @@ void PocoHTTPClient::makeRequestInternal(
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
{
const auto request_configuration = per_request_configuration();
makeRequestInternalImpl(request, request_configuration, response, readLimiter, writeLimiter);
makeRequestInternalImpl(request, response, readLimiter, writeLimiter);
}
String getMethod(const Aws::Http::HttpRequest & request)
@ -330,7 +329,6 @@ String getMethod(const Aws::Http::HttpRequest & request)
void PocoHTTPClient::makeRequestInternalImpl(
Aws::Http::HttpRequest & request,
const DB::ProxyConfiguration & proxy_configuration,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface *,
Aws::Utils::RateLimits::RateLimiterInterface *) const
@ -383,6 +381,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
try
{
const auto proxy_configuration = per_request_configuration();
for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt)
{
Poco::URI target_uri(uri);

View File

@ -156,7 +156,6 @@ private:
void makeRequestInternalImpl(
Aws::Http::HttpRequest & request,
const DB::ProxyConfiguration & proxy_configuration,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;

View File

@ -457,7 +457,7 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
const auto settings = context_->getSettings();
auto proxy_config = getProxyConfiguration(http_method);
auto proxy_config = getProxyConfiguration(request_uri.getScheme());
try
{
@ -543,10 +543,11 @@ StorageURLSink::StorageURLSink(
std::string content_type = FormatFactory::instance().getContentType(format, context, format_settings);
std::string content_encoding = toContentEncodingName(compression_method);
auto proxy_config = getProxyConfiguration(http_method);
auto poco_uri = Poco::URI(uri);
auto proxy_config = getProxyConfiguration(poco_uri.getScheme());
auto write_buffer = std::make_unique<WriteBufferFromHTTP>(
HTTPConnectionGroupType::STORAGE, Poco::URI(uri), http_method, content_type, content_encoding, headers, timeouts, DBMS_DEFAULT_BUFFER_SIZE, proxy_config
HTTPConnectionGroupType::STORAGE, poco_uri, http_method, content_type, content_encoding, headers, timeouts, DBMS_DEFAULT_BUFFER_SIZE, proxy_config
);
const auto & settings = context->getSettingsRef();
@ -1327,6 +1328,7 @@ std::optional<time_t> IStorageURLBase::tryGetLastModificationTime(
.withBufSize(settings.max_read_buffer_size)
.withRedirects(settings.max_http_get_redirects)
.withHeaders(headers)
.withProxy(proxy_config)
.create(credentials);
return buf->tryGetLastModificationTime();

View File

@ -18,11 +18,7 @@ from github.IssueComment import IssueComment
from github.Repository import Repository
from ci_config import CHECK_DESCRIPTIONS, CheckDescription, StatusNames, CIConfig
from env_helper import (
GITHUB_REPOSITORY,
GITHUB_UPSTREAM_REPOSITORY,
TEMP_PATH,
)
from env_helper import GITHUB_REPOSITORY, GITHUB_UPSTREAM_REPOSITORY, TEMP_PATH
from lambda_shared_package.lambda_shared.pr import Labels
from pr_info import PRInfo
from report import (
@ -84,7 +80,7 @@ def get_commit(gh: Github, commit_sha: str, retry_count: int = RETRY) -> Commit:
def post_commit_status(
commit: Commit,
state: Union[StatusType, str],
state: StatusType, # do not change it, it MUST be StatusType and nothing else
report_url: Optional[str] = None,
description: Optional[str] = None,
check_name: Optional[str] = None,

View File

@ -13,11 +13,11 @@ from commit_status_helper import (
trigger_mergeable_check,
update_upstream_sync_status,
)
from env_helper import GITHUB_REPOSITORY, GITHUB_UPSTREAM_REPOSITORY
from get_robot_token import get_best_robot_token
from pr_info import PRInfo
from report import PENDING, SUCCESS, FAILURE
from report import FAILURE, PENDING, SUCCESS, StatusType
from synchronizer_utils import SYNC_BRANCH_PREFIX
from env_helper import GITHUB_REPOSITORY, GITHUB_UPSTREAM_REPOSITORY
def main():
@ -81,7 +81,7 @@ def main():
else:
has_failure = True
ci_state = SUCCESS
ci_state = SUCCESS # type: StatusType
if has_failure:
ci_state = FAILURE
elif has_pending:

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
import logging
from dataclasses import dataclass
import random
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
import boto3 # type: ignore
@ -18,6 +18,9 @@ class Token:
rest: int
SAFE_REQUESTS_LIMIT = 1000
def get_parameter_from_ssm(
name: str, decrypt: bool = True, client: Optional[Any] = None
) -> str:
@ -94,7 +97,7 @@ def get_best_robot_token(tokens_path: str = "/github-tokens") -> str:
best_token = Token(user, value, rest)
elif best_token.rest < rest:
best_token = Token(user, value, rest)
if best_token.rest > 300:
if best_token.rest > SAFE_REQUESTS_LIMIT:
break
assert best_token
ROBOT_TOKEN = best_token

View File

@ -2,21 +2,35 @@ import os
import time
ALL_HTTP_METHODS = {"POST", "PUT", "GET", "HEAD", "CONNECT"}
def check_proxy_logs(
cluster, proxy_instance, protocol, bucket, http_methods={"POST", "PUT", "GET"}
cluster, proxy_instances, protocol, bucket, requested_http_methods
):
for i in range(10):
logs = cluster.get_container_logs(proxy_instance)
# Check with retry that all possible interactions with Minio are present
for http_method in http_methods:
if (
logs.find(http_method + f" {protocol}://minio1:9001/root/data/{bucket}")
>= 0
):
return
for http_method in ALL_HTTP_METHODS:
for proxy_instance in proxy_instances:
logs = cluster.get_container_logs(proxy_instance)
if (
logs.find(
http_method + f" {protocol}://minio1:9001/root/data/{bucket}"
)
>= 0
):
if http_method not in requested_http_methods:
assert (
False
), f"Found http method {http_method} for bucket {bucket} that should not be found in {proxy_instance} logs"
break
else:
if http_method in requested_http_methods:
assert (
False
), f"{http_method} method not found in logs of {proxy_instance} for bucket {bucket}"
time.sleep(1)
else:
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
def wait_resolver(cluster):
@ -33,8 +47,8 @@ def wait_resolver(cluster):
if response == "proxy1" or response == "proxy2":
return
time.sleep(i)
else:
assert False, "Resolver is not up"
assert False, "Resolver is not up"
# Runs simple proxy resolver in python env container.
@ -80,9 +94,33 @@ def perform_simple_queries(node, minio_endpoint):
def simple_test(cluster, proxies, protocol, bucket):
minio_endpoint = build_s3_endpoint(protocol, bucket)
node = cluster.instances[f"{bucket}"]
node = cluster.instances[bucket]
perform_simple_queries(node, minio_endpoint)
for proxy in proxies:
check_proxy_logs(cluster, proxy, protocol, bucket)
check_proxy_logs(cluster, proxies, protocol, bucket, ["PUT", "GET", "HEAD"])
def simple_storage_test(cluster, node, proxies, policy):
node.query(
"""
CREATE TABLE s3_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
policy
)
)
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
assert (
node.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data')"
)
node.query("DROP TABLE IF EXISTS s3_test SYNC")
# not checking for POST because it is in a different format
check_proxy_logs(cluster, proxies, "http", policy, ["PUT", "GET"])

View File

@ -5,7 +5,10 @@ import bottle
@bottle.route("/hostname")
def index():
return "proxy1"
if random.randrange(2) == 0:
return "proxy1"
else:
return "proxy2"
bottle.run(host="0.0.0.0", port=8080)

View File

@ -56,7 +56,7 @@ def test_s3_with_https_proxy_list(cluster):
def test_s3_with_https_remote_proxy(cluster):
proxy_util.simple_test(cluster, ["proxy1"], "https", "remote_proxy_node")
proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "https", "remote_proxy_node")
def test_s3_with_https_env_proxy(cluster):

View File

@ -2,7 +2,6 @@
<proxy>
<http>
<uri>http://proxy1</uri>
<uri>http://proxy2</uri>
</http>
</proxy>
</clickhouse>

View File

@ -3,7 +3,7 @@
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint>http://minio1:9001/root/data/s3</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>

View File

@ -3,6 +3,7 @@ import time
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.s3_url_proxy_tests_util as proxy_util
@pytest.fixture(scope="module")
@ -26,41 +27,8 @@ def cluster():
cluster.shutdown()
def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET"}):
for i in range(10):
logs = cluster.get_container_logs(proxy_instance)
# Check with retry that all possible interactions with Minio are present
for http_method in http_methods:
if logs.find(http_method + " http://minio1") >= 0:
return
time.sleep(1)
else:
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
@pytest.mark.parametrize("policy", ["s3"])
def test_s3_with_proxy_list(cluster, policy):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
policy
)
proxy_util.simple_storage_test(
cluster, cluster.instances["node"], ["proxy1"], policy
)
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
assert (
node.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data')"
)
node.query("DROP TABLE IF EXISTS s3_test SYNC")
for proxy in ["proxy1", "proxy2"]:
check_proxy_logs(cluster, proxy, ["PUT", "GET"])

View File

@ -3,7 +3,7 @@
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint>http://minio1:9001/root/data/s3</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<proxy>
@ -13,9 +13,10 @@
</s3>
<s3_with_resolver>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint>http://minio1:9001/root/data/s3_with_resolver</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<skip_access_check>true</skip_access_check>
<proxy>
<!--
At each interaction with S3 resolver sends empty GET request to specified endpoint URL to obtain proxy host.

View File

@ -26,41 +26,8 @@ def cluster():
cluster.shutdown()
def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET"}):
for i in range(10):
logs = cluster.get_container_logs(proxy_instance)
# Check with retry that all possible interactions with Minio are present
for http_method in http_methods:
if logs.find(http_method + " http://minio1") >= 0:
return
time.sleep(1)
else:
assert False, f"{http_methods} method not found in logs of {proxy_instance}"
@pytest.mark.parametrize("policy", ["s3", "s3_with_resolver"])
def test_s3_with_proxy_list(cluster, policy):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='{}'
""".format(
policy
)
proxy_util.simple_storage_test(
cluster, cluster.instances["node"], ["proxy1", "proxy2"], policy
)
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
assert (
node.query("SELECT * FROM s3_test order by id FORMAT Values")
== "(0,'data'),(1,'data')"
)
node.query("DROP TABLE IF EXISTS s3_test SYNC")
for proxy in ["proxy1", "proxy2"]:
check_proxy_logs(cluster, proxy, ["PUT", "GET"])

View File

@ -5,7 +5,10 @@ import bottle
@bottle.route("/hostname")
def index():
return "proxy1"
if random.randrange(2) == 0:
return "proxy1"
else:
return "proxy2"
bottle.run(host="0.0.0.0", port=8080)

View File

@ -53,7 +53,7 @@ def test_s3_with_http_proxy_list(cluster):
def test_s3_with_http_remote_proxy(cluster):
proxy_util.simple_test(cluster, ["proxy1"], "http", "remote_proxy_node")
proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "http", "remote_proxy_node")
def test_s3_with_http_env_proxy(cluster):

View File

@ -1,9 +1,5 @@
<clickhouse>
<proxy>
<http>
<uri>http://proxy1</uri>
<uri>http://proxy2</uri>
</http>
<https>
<uri>https://proxy1</uri>
<uri>https://proxy2</uri>

View File

@ -5,7 +5,10 @@ import bottle
@bottle.route("/hostname")
def index():
return "proxy1"
if random.randrange(2) == 0:
return "proxy1"
else:
return "proxy2"
bottle.run(host="0.0.0.0", port=8080)

View File

@ -61,7 +61,7 @@ def test_s3_with_https_proxy_list(cluster):
def test_s3_with_https_remote_proxy(cluster):
proxy_util.simple_test(cluster, ["proxy1"], "https", "remote_proxy_node")
proxy_util.simple_test(cluster, ["proxy1", "proxy2"], "https", "remote_proxy_node")
def test_s3_with_https_env_proxy(cluster):