Merge pull request #10576 from Jokser/s3-with-dynamic-proxies

S3 with dynamic proxy configuration
This commit is contained in:
alexey-milovidov 2020-05-03 01:25:13 +03:00 committed by GitHub
commit a235533d5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 381 additions and 103 deletions

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit 45dd8552d3c492defca79d2720bcc809e35654da
Subproject commit 04d54dfa0342d9465fb2eb3bfd4b77a3f7682e99

View File

@ -27,5 +27,16 @@ services:
- SERVER_REDIRECT_CODE=307
- SERVER_ACCESS_LOG=/nginx/access.log
# HTTP proxies for Minio.
proxy1:
image: vimagick/tinyproxy
ports:
- "4081:8888"
proxy2:
image: vimagick/tinyproxy
ports:
- "4082:8888"
volumes:
data1-1:

View File

@ -234,6 +234,9 @@ if(USE_RDKAFKA)
add_headers_and_sources(dbms Storages/Kafka)
endif()
if (USE_AWS_S3)
add_headers_and_sources(dbms Disks/S3)
endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})

View File

@ -1,34 +1,31 @@
#include "DiskS3.h"
#if USE_AWS_S3
# include "DiskFactory.h"
#include "Disks/DiskFactory.h"
# include <random>
# include <utility>
# include <IO/ReadBufferFromFile.h>
# include <IO/ReadBufferFromS3.h>
# include <IO/ReadHelpers.h>
# include <IO/S3Common.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/WriteBufferFromS3.h>
# include <IO/WriteHelpers.h>
# include <Poco/File.h>
# include <Common/checkStackSize.h>
# include <Common/createHardLink.h>
# include <Common/quoteString.h>
# include <Common/thread_local_rng.h>
#include <random>
#include <utility>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Common/checkStackSize.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/thread_local_rng.h>
# include <aws/s3/model/CopyObjectRequest.h>
# include <aws/s3/model/DeleteObjectRequest.h>
# include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int FILE_ALREADY_EXISTS;
extern const int PATH_ACCESS_DENIED;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
}
@ -394,12 +391,14 @@ private:
DiskS3::DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration_,
String bucket_,
String s3_root_path_,
String metadata_path_,
size_t min_upload_part_size_)
: name(std::move(name_))
, client(std::move(client_))
, proxy_configuration(std::move(proxy_configuration_))
, bucket(std::move(bucket_))
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
@ -686,64 +685,4 @@ DiskS3Reservation::~DiskS3Reservation()
}
}
namespace
{
void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}
void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}
void checkRemoveAccess(IDisk & disk)
{
disk.remove("test_acl");
}
}
void registerDiskS3(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
Poco::File disk{context.getPath() + "disks/" + name};
disk.createDirectories();
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
auto client = S3::ClientFactory::instance().create(
uri.endpoint,
config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
String metadata_path = context.getPath() + "disks/" + name + "/";
auto s3disk
= std::make_shared<DiskS3>(name, client, uri.bucket, uri.key, metadata_path, context.getSettingsRef().s3_min_upload_part_size);
/// This code is used only to check access to the corresponding disk.
checkWriteAccess(*s3disk);
checkReadAccess(name, *s3disk);
checkRemoveAccess(*s3disk);
return s3disk;
};
factory.registerDiskType("s3", creator);
}
}
#endif

View File

@ -1,14 +1,10 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#include "Disks/DiskFactory.h"
#include "DynamicProxyConfiguration.h"
#if USE_AWS_S3
# include "DiskFactory.h"
# include <aws/s3/S3Client.h>
# include <Poco/DirectoryIterator.h>
#include <aws/s3/S3Client.h>
#include <Poco/DirectoryIterator.h>
namespace DB
@ -26,6 +22,7 @@ public:
DiskS3(
String name_,
std::shared_ptr<Aws::S3::S3Client> client_,
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration_,
String bucket_,
String s3_root_path_,
String metadata_path_,
@ -105,6 +102,7 @@ private:
private:
const String name;
std::shared_ptr<Aws::S3::S3Client> client;
std::shared_ptr<S3::DynamicProxyConfiguration> proxy_configuration;
const String bucket;
const String s3_root_path;
const String metadata_path;
@ -116,5 +114,3 @@ private:
};
}
#endif

View File

@ -0,0 +1,27 @@
#include "DynamicProxyConfiguration.h"
#include <utility>
#include <common/logger_useful.h>
namespace DB::S3
{
DynamicProxyConfiguration::DynamicProxyConfiguration(std::vector<Poco::URI> _proxies) : proxies(std::move(_proxies)), access_counter(0)
{
}
Aws::Client::ClientConfigurationPerRequest DynamicProxyConfiguration::getConfiguration(const Aws::Http::HttpRequest &)
{
/// Avoid atomic increment if number of proxies is 1.
size_t index = proxies.size() > 1 ? (access_counter++) % proxies.size() : 0;
Aws::Client::ClientConfigurationPerRequest cfg;
cfg.proxyHost = proxies[index].getHost();
cfg.proxyPort = proxies[index].getPort();
LOG_DEBUG(&Logger::get("AWSClient"), "Use proxy: " << proxies[index].toString());
return cfg;
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <utility>
#include <Core/Types.h>
#include <aws/core/client/ClientConfiguration.h>
#include <Poco/URI.h>
namespace DB::S3
{
class DynamicProxyConfiguration
{
public:
explicit DynamicProxyConfiguration(std::vector<Poco::URI> _proxies);
/// Returns proxy configuration on each HTTP request.
Aws::Client::ClientConfigurationPerRequest getConfiguration(const Aws::Http::HttpRequest & request);
private:
/// List of configured proxies.
const std::vector<Poco::URI> proxies;
/// Access counter to get proxy using round-robin strategy.
std::atomic<size_t> access_counter;
};
}

View File

@ -0,0 +1,119 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include "DiskS3.h"
#include "Disks/DiskFactory.h"
#include "DynamicProxyConfiguration.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PATH_ACCESS_DENIED;
}
namespace
{
void checkWriteAccess(IDisk & disk)
{
auto file = disk.writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}
void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read access to S3 bucket in disk " + disk_name, ErrorCodes::PATH_ACCESS_DENIED);
}
void checkRemoveAccess(IDisk & disk) { disk.remove("test_acl"); }
std::shared_ptr<S3::DynamicProxyConfiguration> getProxyConfiguration(const Poco::Util::AbstractConfiguration * config)
{
if (config->has("proxy"))
{
std::vector<String> keys;
config->keys("proxy", keys);
std::vector<Poco::URI> proxies;
for (const auto & key : keys)
if (startsWith(key, "uri"))
{
Poco::URI proxy_uri(config->getString("proxy." + key));
if (proxy_uri.getScheme() != "http")
throw Exception("Only HTTP scheme is allowed in proxy configuration at the moment, proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
if (proxy_uri.getHost().empty())
throw Exception("Empty host in proxy configuration, proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
proxies.push_back(proxy_uri);
LOG_DEBUG(&Logger::get("DiskS3"), "Configured proxy: " << proxy_uri.toString());
}
if (!proxies.empty())
return std::make_shared<S3::DynamicProxyConfiguration>(proxies);
}
return nullptr;
}
}
void registerDiskS3(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
const auto * disk_config = config.createView(config_prefix);
Poco::File disk{context.getPath() + "disks/" + name};
disk.createDirectories();
Aws::Client::ClientConfiguration cfg;
S3::URI uri(Poco::URI(disk_config->getString("endpoint")));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
cfg.endpointOverride = uri.endpoint;
auto proxy_config = getProxyConfiguration(disk_config);
if (proxy_config)
cfg.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
auto client = S3::ClientFactory::instance().create(
cfg,
disk_config->getString("access_key_id", ""),
disk_config->getString("secret_access_key", ""));
String metadata_path = context.getPath() + "disks/" + name + "/";
auto s3disk = std::make_shared<DiskS3>(
name,
client,
std::move(proxy_config),
uri.bucket,
uri.key,
metadata_path,
context.getSettingsRef().s3_min_upload_part_size);
/// This code is used only to check access to the corresponding disk.
checkWriteAccess(*s3disk);
checkReadAccess(name, *s3disk);
checkRemoveAccess(*s3disk);
return s3disk;
};
factory.registerDiskType("s3", creator);
}
}

13
src/Disks/S3/ya.make Normal file
View File

@ -0,0 +1,13 @@
LIBRARY()
PEERDIR(
clickhouse/src/Common
)
SRCS(
DiskS3.cpp
registerDiskS3.cpp
DynamicProxyConfiguration.cpp
)
END()

View File

@ -8,7 +8,6 @@ SRCS(
DiskFactory.cpp
DiskLocal.cpp
DiskMemory.cpp
DiskS3.cpp
DiskSpaceMonitor.cpp
IDisk.cpp
registerDisks.cpp

View File

@ -66,7 +66,7 @@ namespace S3
{
ClientFactory::ClientFactory()
{
aws_options = Aws::SDKOptions {};
aws_options = Aws::SDKOptions{};
Aws::InitAPI(aws_options);
Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>());
}
@ -83,7 +83,7 @@ namespace S3
return ret;
}
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
const String & endpoint,
const String & access_key_id,
const String & secret_access_key)
@ -92,17 +92,24 @@ namespace S3
if (!endpoint.empty())
cfg.endpointOverride = endpoint;
return create(cfg, access_key_id, secret_access_key);
}
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
Aws::Client::ClientConfiguration & cfg,
const String & access_key_id,
const String & secret_access_key)
{
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
return std::make_shared<Aws::S3::S3Client>(
credentials, // Aws credentials.
std::move(cfg), // Client configuration.
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, // Sign policy.
endpoint.empty() // Use virtual addressing only if endpoint is not specified.
credentials, // Aws credentials.
std::move(cfg), // Client configuration.
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, // Sign policy.
cfg.endpointOverride.empty() // Use virtual addressing only if endpoint is not specified.
);
}
URI::URI(const Poco::URI & uri_)
{
/// Case when bucket name represented in domain name of S3 URL.

View File

@ -23,10 +23,15 @@ public:
static ClientFactory & instance();
std::shared_ptr<Aws::S3::S3Client> create(const String & endpoint,
const String & access_key_id,
const String & secret_access_key);
static std::shared_ptr<Aws::S3::S3Client> create(
const String & endpoint,
const String & access_key_id,
const String & secret_access_key);
static std::shared_ptr<Aws::S3::S3Client> create(
Aws::Client::ClientConfiguration & cfg,
const String & access_key_id,
const String & secret_access_key);
private:
ClientFactory();

View File

@ -300,6 +300,15 @@ class ClickHouseCluster:
handle = self.docker_client.containers.get(docker_id)
return handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
def get_container_id(self, instance_name):
docker_id = self.get_instance_docker_id(instance_name)
handle = self.docker_client.containers.get(docker_id)
return handle.attrs['Id']
def get_container_logs(self, instance_name):
container_id = self.get_container_id(instance_name)
return self.docker_client.api.logs(container_id)
def wait_mysql_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,25 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<proxy>
<uri>http://proxy1:8888</uri>
<uri>http://proxy2:8888</uri>
</proxy>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<profiles>
<default/>
</profiles>
</yandex>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<yandex>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,64 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(cluster):
minio_client = cluster.minio_client
if minio_client.bucket_exists(cluster.minio_bucket):
minio_client.remove_bucket(cluster.minio_bucket)
minio_client.make_bucket(cluster.minio_bucket)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", config_dir="configs", with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
yield cluster
finally:
cluster.shutdown()
def check_proxy_logs(cluster, proxy_instance):
logs = cluster.get_container_logs(proxy_instance)
# Check that all possible interactions with Minio are present
for http_method in ["POST", "PUT", "GET", "DELETE"]:
assert logs.find(http_method + " http://minio1") >= 0
def test_s3_with_proxy_list(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3'
"""
)
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
assert node.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')"
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
for proxy in ["proxy1", "proxy2"]:
check_proxy_logs(cluster, proxy)