Merge pull request #10744 from Jokser/s3-proxy-resolver

S3 with proxy resolver
This commit is contained in:
alexey-milovidov 2020-05-14 02:19:10 +03:00 committed by GitHub
commit f8195a577c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 301 additions and 80 deletions

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit 04d54dfa0342d9465fb2eb3bfd4b77a3f7682e99
Subproject commit fb5c604525f5151d75a856462653e7e38b559b79

View File

@ -16,6 +16,9 @@ services:
interval: 30s
timeout: 20s
retries: 3
depends_on:
- redirect
- resolver
# Redirects all requests to origin Minio.
redirect:
@ -38,5 +41,15 @@ services:
ports:
- "4082:8888"
# Empty container to run proxy resolver.
resolver:
image: python:3
ports:
- "4083:8080"
tty: true
depends_on:
- proxy1
- proxy2
volumes:
data1-1:

View File

@ -391,7 +391,7 @@ private:
DiskS3::DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration_,
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration_,
String bucket_,
String s3_root_path_,
String metadata_path_,

View File

@ -1,7 +1,7 @@
#pragma once
#include "Disks/DiskFactory.h"
#include "DynamicProxyConfiguration.h"
#include "ProxyConfiguration.h"
#include <aws/s3/S3Client.h>
#include <Poco/DirectoryIterator.h>
@ -22,7 +22,7 @@ public:
DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration_,
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration_,
String bucket_,
String s3_root_path_,
String metadata_path_,
@ -102,7 +102,7 @@ private:
private:
const String name;
std::shared_ptr<Aws::S3::S3Client> client;
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration;
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration;
const String bucket;
const String s3_root_path;
const String metadata_path;

View File

@ -1,24 +0,0 @@
#pragma once
#include <utility>
#include <Core/Types.h>
#include <aws/core/client/ClientConfiguration.h>
#include <Poco/URI.h>
namespace DB::S3
{
class DynamicProxyConfiguration
{
public:
explicit DynamicProxyConfiguration(std::vector<Poco::URI> _proxies);
/// Returns proxy configuration on each HTTP request.
Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request);
private:
/// List of configured proxies.
const std::vector<Poco::URI> proxies;
/// Access counter to get proxy using round-robin strategy.
std::atomic<size_t> access_counter;
};
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <utility>
#include <Core/Types.h>
#include <aws/core/client/ClientConfiguration.h>
#include <Poco/URI.h>
namespace DB::S3
{
class ProxyConfiguration
{
public:
virtual ~ProxyConfiguration() = default;
/// Returns proxy configuration on each HTTP request.
virtual Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) = 0;
};
}

View File

@ -1,21 +1,22 @@
#include "DynamicProxyConfiguration.h"
#include "ProxyListConfiguration.h"
#include <utility>
#include <common/logger_useful.h>
namespace DB::S3
{
DynamicProxyConfiguration::DynamicProxyConfiguration(std::vector<Poco::URI> _proxies) : proxies(std::move(_proxies)), access_counter(0)
ProxyListConfiguration::ProxyListConfiguration(std::vector<Poco::URI> proxies_) : proxies(std::move(proxies_)), access_counter(0)
{
}
Aws::Client::ClientConfigurationPerRequest DynamicProxyConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
Aws::Client::ClientConfigurationPerRequest ProxyListConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
{
/// Avoid atomic increment if number of proxies is 1.
size_t index = proxies.size() > 1 ? (access_counter++) % proxies.size() : 0;
Aws::Client::ClientConfigurationPerRequest cfg;
cfg.proxyScheme = Aws::Http::SchemeMapper::FromString(proxies[index].getScheme().c_str());
cfg.proxyHost = proxies[index].getHost();
cfg.proxyPort = proxies[index].getPort();

View File

@ -0,0 +1,23 @@
#pragma once
#include "ProxyConfiguration.h"
namespace DB::S3
{
/**
* For each request to S3 it chooses a proxy from the specified list using round-robin strategy.
*/
class ProxyListConfiguration : public ProxyConfiguration
{
public:
explicit ProxyListConfiguration(std::vector<Poco::URI> proxies_);
Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override;
private:
/// List of configured proxies.
const std::vector<Poco::URI> proxies;
/// Access counter to get proxy using round-robin strategy.
std::atomic<size_t> access_counter;
};
}

View File

@ -0,0 +1,60 @@
#include "ProxyResolverConfiguration.h"
#include <utility>
#include <IO/HTTPCommon.h>
#include "Poco/StreamCopier.h"
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <common/logger_useful.h>
namespace DB::S3
{
ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_)
: endpoint(endpoint_), proxy_scheme(std::move(proxy_scheme_)), proxy_port(proxy_port_)
{
}
Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
{
LOG_DEBUG(&Logger::get("AWSClient"), "Obtain proxy using resolver: " << endpoint.toString());
/// 1 second is enough for now.
/// TODO: Make timeouts configurable.
ConnectionTimeouts timeouts(
Poco::Timespan(1000000), /// Connection timeout.
Poco::Timespan(1000000), /// Send timeout.
Poco::Timespan(1000000) /// Receive timeout.
);
auto session = makeHTTPSession(endpoint, timeouts);
Aws::Client::ClientConfigurationPerRequest cfg;
try
{
/// It should be just empty GET / request.
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_1_1);
session->sendRequest(request);
Poco::Net::HTTPResponse response;
auto & response_body_stream = session->receiveResponse(response);
String proxy_host;
/// Read proxy host as string from response body.
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);
LOG_DEBUG(&Logger::get("AWSClient"), "Use proxy: " << proxy_scheme << "://" << proxy_host << ":" << proxy_port);
cfg.proxyScheme = Aws::Http::SchemeMapper::FromString(proxy_scheme.c_str());
cfg.proxyHost = proxy_host;
cfg.proxyPort = proxy_port;
return cfg;
}
catch (...)
{
tryLogCurrentException("AWSClient", "Failed to obtain proxy");
/// Don't use proxy if it can't be obtained.
return cfg;
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include "ProxyConfiguration.h"
namespace DB::S3
{
/**
* Proxy configuration where proxy host is obtained each time from specified endpoint.
* For each request to S3 it makes GET request to specified endpoint and reads proxy host from a response body.
* Specified scheme and port added to obtained proxy host to form completed proxy URL.
*/
class ProxyResolverConfiguration : public ProxyConfiguration
{
public:
ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_);
Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request) override;
private:
/// Endpoint to obtain a proxy host.
const Poco::URI endpoint;
/// Scheme for obtained proxy.
const String proxy_scheme;
/// Port for obtained proxy.
const unsigned proxy_port;
};
}

View File

@ -6,7 +6,9 @@
#include <Interpreters/Context.h>
#include "DiskS3.h"
#include "Disks/DiskFactory.h"
#include "DynamicProxyConfiguration.h"
#include "ProxyConfiguration.h"
#include "ProxyListConfiguration.h"
#include "ProxyResolverConfiguration.h"
namespace DB
{
@ -35,35 +37,67 @@ namespace
void checkRemoveAccess(IDisk & disk) { disk.remove("test_acl"); }
std::shared_ptr<S3::DynamicProxyConfiguration> getProxyConfiguration(const Poco::Util::AbstractConfiguration * config)
std::shared_ptr<S3::ProxyResolverConfiguration> getProxyResolverConfiguration(const Poco::Util::AbstractConfiguration * proxy_resolver_config)
{
if (config->has("proxy"))
{
std::vector<String> keys;
config->keys("proxy", keys);
auto endpoint = Poco::URI(proxy_resolver_config->getString("endpoint"));
auto proxy_scheme = proxy_resolver_config->getString("proxy_scheme");
if (proxy_scheme != "http" && proxy_scheme != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy resolver config: " + proxy_scheme, ErrorCodes::BAD_ARGUMENTS);
auto proxy_port = proxy_resolver_config->getUInt("proxy_port");
std::vector<Poco::URI> proxies;
for (const auto & key : keys)
if (startsWith(key, "uri"))
{
Poco::URI proxy_uri(config->getString("proxy." + key));
LOG_DEBUG(
&Logger::get("DiskS3"), "Configured proxy resolver: " << endpoint.toString() << ", Scheme: " << proxy_scheme << ", Port: " << proxy_port);
if (proxy_uri.getScheme() != "http")
throw Exception("Only HTTP scheme is allowed in proxy configuration at the moment, proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
if (proxy_uri.getHost().empty())
throw Exception("Empty host in proxy configuration, proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port);
}
proxies.push_back(proxy_uri);
std::shared_ptr<S3::ProxyListConfiguration> getProxyListConfiguration(const Poco::Util::AbstractConfiguration * proxy_config)
{
std::vector<String> keys;
proxy_config->keys(keys);
LOG_DEBUG(&Logger::get("DiskS3"), "Configured proxy: " << proxy_uri.toString());
}
std::vector<Poco::URI> proxies;
for (const auto & key : keys)
if (startsWith(key, "uri"))
{
Poco::URI proxy_uri(proxy_config->getString(key));
if (proxy_uri.getScheme() != "http" && proxy_uri.getScheme() != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
if (proxy_uri.getHost().empty())
throw Exception("Empty host in proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
proxies.push_back(proxy_uri);
LOG_DEBUG(&Logger::get("DiskS3"), "Configured proxy: " << proxy_uri.toString());
}
if (!proxies.empty())
return std::make_shared<S3::ProxyListConfiguration>(proxies);
if (!proxies.empty())
return std::make_shared<S3::DynamicProxyConfiguration>(proxies);
}
return nullptr;
}
std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const Poco::Util::AbstractConfiguration * config)
{
if (!config->has("proxy"))
return nullptr;
const auto * proxy_config = config->createView("proxy");
std::vector<String> config_keys;
proxy_config->keys(config_keys);
if (auto resolver_configs = std::count(config_keys.begin(), config_keys.end(), "resolver"))
{
if (resolver_configs > 1)
throw Exception("Multiple proxy resolver configurations aren't allowed", ErrorCodes::BAD_ARGUMENTS);
return getProxyResolverConfiguration(proxy_config->createView("resolver"));
}
return getProxyListConfiguration(proxy_config);
}
}
@ -100,7 +134,7 @@ void registerDiskS3(DiskFactory & factory)
auto s3disk = std::make_shared<DiskS3>(
name,
client,
std::move(proxy_config),
proxy_config,
uri.bucket,
uri.key,
metadata_path,

View File

@ -7,7 +7,8 @@ PEERDIR(
SRCS(
DiskS3.cpp
registerDiskS3.cpp
DynamicProxyConfiguration.cpp
ProxyListConfiguration.cpp
ProxyResolverConfiguration.cpp
)
END()

View File

@ -309,6 +309,32 @@ class ClickHouseCluster:
container_id = self.get_container_id(instance_name)
return self.docker_client.api.logs(container_id)
def exec_in_container(self, container_id, cmd, detach=False, **kwargs):
exec_id = self.docker_client.api.exec_create(container_id, cmd, **kwargs)
output = self.docker_client.api.exec_start(exec_id, detach=detach)
output = output.decode('utf8')
exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
if exit_code:
container_info = self.docker_client.api.inspect_container(container_id)
image_id = container_info.get('Image')
image_info = self.docker_client.api.inspect_image(image_id)
print("Command failed in container {}: ".format(container_id))
pprint.pprint(container_info)
print("")
print("Container {} uses image {}: ".format(container_id, image_id))
pprint.pprint(image_info)
print("")
raise Exception('Cmd "{}" failed in container {}. Return code {}. Output: {}'.format(' '.join(cmd), container_id, exit_code, output))
return output
def copy_file_to_container(self, container_id, local_path, dest_path):
with open(local_path, 'r') as fdata:
data = fdata.read()
encoded_data = base64.b64encode(data)
self.exec_in_container(container_id, ["bash", "-c", "echo {} | base64 --decode > {}".format(encoded_data, dest_path)],
user='root')
def wait_mysql_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
@ -746,24 +772,8 @@ class ClickHouseInstance:
assert_eq_with_retry(self, "select 1", "1", retry_count=int(stop_start_wait_sec / 0.5), sleep_time=0.5)
def exec_in_container(self, cmd, detach=False, **kwargs):
container = self.get_docker_handle()
exec_id = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
output = self.docker_client.api.exec_start(exec_id, detach=detach)
output = output.decode('utf8')
exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
if exit_code:
container_info = self.docker_client.api.inspect_container(container.id)
image_id = container_info.get('Image')
image_info = self.docker_client.api.inspect_image(image_id)
print("Command failed in container {}: ".format(container.id))
pprint.pprint(container_info)
print("")
print("Container {} uses image {}: ".format(container.id, image_id))
pprint.pprint(image_info)
print("")
raise Exception('Cmd "{}" failed in container {}. Return code {}. Output: {}'.format(' '.join(cmd), container.id, exit_code, output))
return output
container_id = self.get_docker_handle().id
return self.cluster.exec_in_container(container_id, cmd, detach, **kwargs)
def contains_in_log(self, substring):
result = self.exec_in_container(
@ -771,11 +781,8 @@ class ClickHouseInstance:
return len(result) > 0
def copy_file_to_container(self, local_path, dest_path):
with open(local_path, 'r') as fdata:
data = fdata.read()
encoded_data = base64.b64encode(data)
self.exec_in_container(["bash", "-c", "echo {} | base64 --decode > {}".format(encoded_data, dest_path)],
user='root')
container_id = self.get_docker_handle().id
return self.cluster.copy_file_to_container(container_id, local_path, dest_path)
def get_process_pid(self, process_name):
output = self.exec_in_container(["bash", "-c",

View File

@ -11,6 +11,24 @@
<uri>http://proxy2:8888</uri>
</proxy>
</s3>
<s3_with_resolver>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<proxy>
<!--
At each interaction with S3 resolver sends empty GET / request to specified endpoint to obtain proxy host.
Proxy host is returned as string in response body.
Then S3 client uses proxy URL formed as proxy_scheme://proxy_host:proxy_port to make request.
-->
<resolver>
<endpoint>http://resolver:8080</endpoint>
<proxy_scheme>http</proxy_scheme>
<proxy_port>8888</proxy_port>
</resolver>
</proxy>
</s3_with_resolver>
</disks>
<policies>
<s3>
@ -20,6 +38,13 @@
</main>
</volumes>
</s3>
<s3_with_resolver>
<volumes>
<main>
<disk>s3_with_resolver</disk>
</main>
</volumes>
</s3_with_resolver>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,4 @@
#!/bin/bash
pip install bottle
python resolver.py

View File

@ -0,0 +1,13 @@
import bottle
import random
@bottle.route('/')
def index():
if random.randrange(2) == 0:
return 'proxy1'
else:
return 'proxy2'
bottle.run(host='0.0.0.0', port=8080)

View File

@ -1,4 +1,5 @@
import logging
import os
import pytest
from helpers.cluster import ClickHouseCluster
@ -17,6 +18,17 @@ def prepare_s3_bucket(cluster):
minio_client.make_bucket(cluster.minio_bucket)
# Runs simple proxy resolver in python env container.
def run_resolver(cluster):
container_id = cluster.get_container_id('resolver')
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "proxy-resolver", "resolver.py"),
"resolver.py")
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "proxy-resolver", "entrypoint.sh"),
"entrypoint.sh")
cluster.exec_in_container(container_id, ["/bin/bash", "entrypoint.sh"], detach=True)
@pytest.fixture(scope="module")
def cluster():
try:
@ -29,6 +41,9 @@ def cluster():
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
run_resolver(cluster)
logging.info("Proxy resolver started")
yield cluster
finally:
cluster.shutdown()
@ -41,7 +56,10 @@ def check_proxy_logs(cluster, proxy_instance):
assert logs.find(http_method + " http://minio1") >= 0
def test_s3_with_proxy_list(cluster):
@pytest.mark.parametrize(
"policy", ["s3", "s3_with_resolver"]
)
def test_s3_with_proxy_list(cluster, policy):
node = cluster.instances["node"]
node.query(
@ -51,8 +69,9 @@ def test_s3_with_proxy_list(cluster):
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3'
SETTINGS storage_policy='{}'
"""
.format(policy)
)
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")