Merge pull request #63806 from jkartseva/add-endpoint-subpath-to-plain-rw

Add `endpoint_subpath` S3 URI setting
This commit is contained in:
Nikita Mikhaylov 2024-05-15 13:50:43 +00:00 committed by GitHub
commit 330af1c37a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 75 additions and 31 deletions

View File

@ -139,7 +139,11 @@ namespace
S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const ContextPtr & context) S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const ContextPtr & context)
{ {
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint")); String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint); String endpoint_subpath;
if (config.has(config_prefix + ".endpoint_subpath"))
endpoint_subpath = context->getMacros()->expand(config.getString(config_prefix + ".endpoint_subpath"));
S3::URI uri(fs::path(endpoint) / endpoint_subpath);
/// An empty key remains empty. /// An empty key remains empty.
if (!uri.key.empty() && !uri.key.ends_with('/')) if (!uri.key.empty() && !uri.key.ends_with('/'))

View File

@ -4,6 +4,7 @@
<disk_s3_plain_rewritable> <disk_s3_plain_rewritable>
<type>s3_plain_rewritable</type> <type>s3_plain_rewritable</type>
<endpoint>http://minio1:9001/root/data/</endpoint> <endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint_subpath from_env="ENDPOINT_SUBPATH"></endpoint_subpath>
<access_key_id>minio</access_key_id> <access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key> <secret_access_key>minio123</secret_access_key>
</disk_s3_plain_rewritable> </disk_s3_plain_rewritable>

View File

@ -1,24 +1,39 @@
import pytest import pytest
import random import random
import string import string
import threading
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node", NUM_WORKERS = 5
nodes = []
for i in range(NUM_WORKERS):
name = "node{}".format(i + 1)
node = cluster.add_instance(
name,
main_configs=["configs/storage_conf.xml"], main_configs=["configs/storage_conf.xml"],
env_variables={"ENDPOINT_SUBPATH": name},
with_minio=True, with_minio=True,
stay_alive=True, stay_alive=True,
) )
nodes.append(node)
insert_values = [ MAX_ROWS = 1000
"(0,'data'),(1,'data')",
",".join(
def gen_insert_values(size):
return ",".join(
f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')" f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')"
for i in range(10) for i in range(size)
), )
]
insert_values = ",".join(
f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')" for i in range(10)
)
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="module", autouse=True)
@ -32,47 +47,71 @@ def start_cluster():
@pytest.mark.order(0) @pytest.mark.order(0)
def test_insert(): def test_insert():
for index, value in enumerate(insert_values): def create_insert(node, insert_values):
node.query( node.query(
""" """
CREATE TABLE test_{} ( CREATE TABLE test (
id Int64, id Int64,
data String data String
) ENGINE=MergeTree() ) ENGINE=MergeTree()
ORDER BY id ORDER BY id
SETTINGS storage_policy='s3_plain_rewritable' SETTINGS storage_policy='s3_plain_rewritable'
""".format( """
index
)
) )
node.query("INSERT INTO test VALUES {}".format(insert_values))
node.query("INSERT INTO test_{} VALUES {}".format(index, value)) insert_values_arr = [
gen_insert_values(random.randint(1, MAX_ROWS)) for _ in range(0, NUM_WORKERS)
]
threads = []
for i in range(NUM_WORKERS):
t = threading.Thread(
target=create_insert, args=(nodes[i], insert_values_arr[i])
)
threads.append(t)
t.start()
for t in threads:
t.join()
for i in range(NUM_WORKERS):
assert ( assert (
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index)) nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
== value == insert_values_arr[i]
) )
@pytest.mark.order(1) @pytest.mark.order(1)
def test_restart(): def test_restart():
for index, value in enumerate(insert_values): insert_values_arr = []
assert ( for i in range(NUM_WORKERS):
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index)) insert_values_arr.append(
== value nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
) )
def restart(node):
node.restart_clickhouse() node.restart_clickhouse()
for index, value in enumerate(insert_values): threads = []
for i in range(NUM_WORKERS):
t = threading.Thread(target=restart, args=(nodes[i],))
threads.append(t)
t.start()
for t in threads:
t.join()
for i in range(NUM_WORKERS):
assert ( assert (
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index)) nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
== value == insert_values_arr[i]
) )
@pytest.mark.order(2) @pytest.mark.order(2)
def test_drop(): def test_drop():
for index, value in enumerate(insert_values): for i in range(NUM_WORKERS):
node.query("DROP TABLE IF EXISTS test_{} SYNC".format(index)) nodes[i].query("DROP TABLE IF EXISTS test SYNC")
it = cluster.minio_client.list_objects( it = cluster.minio_client.list_objects(
cluster.minio_bucket, "data/", recursive=True cluster.minio_bucket, "data/", recursive=True