mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
S3 with proxy integration test and fixes.
This commit is contained in:
parent
2ecf4b5a23
commit
e5e84960ea
@ -27,5 +27,16 @@ services:
|
||||
- SERVER_REDIRECT_CODE=307
|
||||
- SERVER_ACCESS_LOG=/nginx/access.log
|
||||
|
||||
# HTTP proxies for Minio.
|
||||
proxy1:
|
||||
image: vimagick/tinyproxy
|
||||
ports:
|
||||
- "4081:8888"
|
||||
|
||||
proxy2:
|
||||
image: vimagick/tinyproxy
|
||||
ports:
|
||||
- "4082:8888"
|
||||
|
||||
volumes:
|
||||
data1-1:
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "DynamicProxyConfiguration.h"
|
||||
|
||||
#include <utility>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
namespace DB::S3
|
||||
{
|
||||
@ -18,7 +19,9 @@ Aws::Client::ClientConfigurationPerRequest DynamicProxyConfiguration::getConfigu
|
||||
cfg.proxyHost = proxies[index].getHost();
|
||||
cfg.proxyPort = proxies[index].getPort();
|
||||
|
||||
return cfg;
|
||||
LOG_DEBUG(&Logger::get("AWSClient"), "Use proxy: " << proxies[index].toString());
|
||||
|
||||
return cfg;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -37,18 +37,29 @@ namespace
|
||||
|
||||
std::unique_ptr<S3::DynamicProxyConfiguration> getProxyConfiguration(const Poco::Util::AbstractConfiguration * config)
|
||||
{
|
||||
if (config->has("proxies"))
|
||||
if (config->has("proxy"))
|
||||
{
|
||||
std::vector<String> keys;
|
||||
config->keys("proxies", keys);
|
||||
config->keys("proxy", keys);
|
||||
|
||||
std::vector<Poco::URI> proxies;
|
||||
for (const auto & key : keys)
|
||||
if (startsWith(key, "uri"))
|
||||
proxies.emplace_back(config->getString("proxies." + key));
|
||||
{
|
||||
Poco::URI proxy_uri(config->getString("proxy." + key));
|
||||
|
||||
if (proxy_uri.getScheme() != "http")
|
||||
throw Exception("Only HTTP scheme is allowed in proxy configuration at the moment, proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
|
||||
if (proxy_uri.getHost().empty())
|
||||
throw Exception("Empty host in proxy configuration, proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
proxies.push_back(proxy_uri);
|
||||
|
||||
LOG_DEBUG(&Logger::get("DiskS3"), "Configured proxy: " << proxy_uri.toString());
|
||||
}
|
||||
|
||||
if (!proxies.empty())
|
||||
return std::make_unique<DB::S3::DynamicProxyConfiguration>(proxies);
|
||||
return std::make_unique<S3::DynamicProxyConfiguration>(proxies);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -300,6 +300,15 @@ class ClickHouseCluster:
|
||||
handle = self.docker_client.containers.get(docker_id)
|
||||
return handle.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
|
||||
|
||||
def get_container_id(self, instance_name):
|
||||
docker_id = self.get_instance_docker_id(instance_name)
|
||||
handle = self.docker_client.containers.get(docker_id)
|
||||
return handle.attrs['Id']
|
||||
|
||||
def get_container_logs(self, instance_name):
|
||||
container_id = self.get_container_id(instance_name)
|
||||
return self.docker_client.api.logs(container_id)
|
||||
|
||||
def wait_mysql_to_start(self, timeout=60):
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
|
0
tests/integration/test_s3_with_proxy/__init__.py
Normal file
0
tests/integration/test_s3_with_proxy/__init__.py
Normal file
@ -0,0 +1,12 @@
|
||||
<yandex>
|
||||
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-server/log.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>10</count>
|
||||
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
|
||||
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
|
||||
</logger>
|
||||
</yandex>
|
@ -0,0 +1,25 @@
|
||||
<yandex>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<proxy>
|
||||
<uri>http://proxy1:8888</uri>
|
||||
<uri>http://proxy2:8888</uri>
|
||||
</proxy>
|
||||
</s3>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</yandex>
|
@ -0,0 +1,5 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<default/>
|
||||
</profiles>
|
||||
</yandex>
|
20
tests/integration/test_s3_with_proxy/configs/config.xml
Normal file
20
tests/integration/test_s3_with_proxy/configs/config.xml
Normal file
@ -0,0 +1,20 @@
|
||||
<?xml version="1.0"?>
|
||||
<yandex>
|
||||
<tcp_port>9000</tcp_port>
|
||||
<listen_host>127.0.0.1</listen_host>
|
||||
|
||||
<openSSL>
|
||||
<client>
|
||||
<cacheSessions>true</cacheSessions>
|
||||
<verificationMode>none</verificationMode>
|
||||
<invalidCertificateHandler>
|
||||
<name>AcceptCertificateHandler</name>
|
||||
</invalidCertificateHandler>
|
||||
</client>
|
||||
</openSSL>
|
||||
|
||||
<max_concurrent_queries>500</max_concurrent_queries>
|
||||
<mark_cache_size>5368709120</mark_cache_size>
|
||||
<path>./clickhouse/</path>
|
||||
<users_config>users.xml</users_config>
|
||||
</yandex>
|
64
tests/integration/test_s3_with_proxy/test.py
Normal file
64
tests/integration/test_s3_with_proxy/test.py
Normal file
@ -0,0 +1,64 @@
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
logging.getLogger().addHandler(logging.StreamHandler())
|
||||
|
||||
|
||||
# Creates S3 bucket for tests and allows anonymous read-write access to it.
|
||||
def prepare_s3_bucket(cluster):
|
||||
minio_client = cluster.minio_client
|
||||
|
||||
if minio_client.bucket_exists(cluster.minio_bucket):
|
||||
minio_client.remove_bucket(cluster.minio_bucket)
|
||||
|
||||
minio_client.make_bucket(cluster.minio_bucket)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance("node", config_dir="configs", with_minio=True)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
prepare_s3_bucket(cluster)
|
||||
logging.info("S3 bucket created")
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def check_proxy_logs(cluster, proxy_instance):
|
||||
logs = cluster.get_container_logs(proxy_instance)
|
||||
# Check that all possible interactions with Minio are present
|
||||
for http_method in ["POST", "PUT", "GET", "DELETE"]:
|
||||
assert logs.find(http_method + " http://minio1") >= 0
|
||||
|
||||
|
||||
def test_s3_with_proxy_list(cluster):
|
||||
node = cluster.instances["node"]
|
||||
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE s3_test (
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
SETTINGS storage_policy='s3'
|
||||
"""
|
||||
)
|
||||
|
||||
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
|
||||
assert node.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')"
|
||||
|
||||
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
||||
|
||||
for proxy in ["proxy1", "proxy2"]:
|
||||
check_proxy_logs(cluster, proxy)
|
Loading…
Reference in New Issue
Block a user