ClickHouse/tests/integration/test_s3_cluster/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

227 lines
7.2 KiB
Python
Raw Normal View History

2021-03-24 23:22:23 +00:00
import logging
import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
S3_DATA = [
"data/clickhouse/part1.csv",
"data/clickhouse/part123.csv",
"data/database/part2.csv",
"data/database/partition675.csv",
]
2021-03-24 23:22:23 +00:00
def create_buckets_s3(cluster):
minio = cluster.minio_client
for file in S3_DATA:
minio.fput_object(
bucket_name=cluster.minio_bucket,
object_name=file,
file_path=os.path.join(SCRIPT_DIR, file),
)
2021-03-24 23:22:23 +00:00
for obj in minio.list_objects(cluster.minio_bucket, recursive=True):
print(obj.object_name)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"s0_0_0", main_configs=["configs/cluster.xml"], with_minio=True
)
cluster.add_instance("s0_0_1", main_configs=["configs/cluster.xml"])
cluster.add_instance("s0_1_0", main_configs=["configs/cluster.xml"])
2021-03-24 23:22:23 +00:00
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
create_buckets_s3(cluster)
yield cluster
finally:
cluster.shutdown()
2021-03-26 15:33:14 +00:00
def test_select_all(started_cluster):
node = started_cluster.instances["s0_0_0"]
pure_s3 = node.query(
"""
2021-03-26 15:33:14 +00:00
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
ORDER BY (name, value, polygon)"""
)
2021-03-24 23:22:23 +00:00
# print(pure_s3)
s3_distibuted = node.query(
"""
2021-04-12 21:52:16 +00:00
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)"""
)
2021-03-24 23:22:23 +00:00
# print(s3_distibuted)
assert TSV(pure_s3) == TSV(s3_distibuted)
2021-03-26 15:33:14 +00:00
2021-03-24 23:22:23 +00:00
def test_count(started_cluster):
node = started_cluster.instances["s0_0_0"]
pure_s3 = node.query(
"""
2021-03-26 15:33:14 +00:00
SELECT count(*) from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
2021-03-24 23:22:23 +00:00
# print(pure_s3)
s3_distibuted = node.query(
"""
2021-04-12 21:52:16 +00:00
SELECT count(*) from s3Cluster(
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
2021-03-26 15:33:14 +00:00
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
2021-03-24 23:22:23 +00:00
# print(s3_distibuted)
2021-03-26 15:33:14 +00:00
assert TSV(pure_s3) == TSV(s3_distibuted)
def test_count_macro(started_cluster):
node = started_cluster.instances["s0_0_0"]
s3_macro = node.query(
"""
SELECT count(*) from s3Cluster(
'{default_cluster_macro}', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(s3_distibuted)
s3_distibuted = node.query(
"""
SELECT count(*) from s3Cluster(
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(s3_distibuted)
assert TSV(s3_macro) == TSV(s3_distibuted)
2021-04-08 19:00:39 +00:00
def test_union_all(started_cluster):
node = started_cluster.instances["s0_0_0"]
pure_s3 = node.query(
"""
2021-04-12 17:48:16 +00:00
SELECT * FROM
(
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
2021-04-12 17:48:16 +00:00
UNION ALL
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
2021-04-12 17:48:16 +00:00
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
2021-04-08 19:00:39 +00:00
ORDER BY (name, value, polygon)
"""
)
2021-04-08 19:00:39 +00:00
# print(pure_s3)
s3_distibuted = node.query(
"""
2021-04-12 17:48:16 +00:00
SELECT * FROM
(
2021-04-12 21:52:16 +00:00
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
2021-04-12 17:48:16 +00:00
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
2021-04-12 21:52:16 +00:00
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
2021-04-12 17:48:16 +00:00
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
ORDER BY (name, value, polygon)
"""
)
2021-04-08 19:00:39 +00:00
# print(s3_distibuted)
assert TSV(pure_s3) == TSV(s3_distibuted)
2021-03-26 15:33:14 +00:00
def test_wrong_cluster(started_cluster):
node = started_cluster.instances["s0_0_0"]
error = node.query_and_get_error(
"""
2021-04-12 21:52:16 +00:00
SELECT count(*) from s3Cluster(
2021-04-08 19:00:39 +00:00
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
2021-04-08 19:00:39 +00:00
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
2021-04-12 21:52:16 +00:00
SELECT count(*) from s3Cluster(
2021-03-26 15:33:14 +00:00
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
2022-03-31 01:28:07 +00:00
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
"""
)
assert "not found" in error
def test_ambiguous_join(started_cluster):
node = started_cluster.instances["s0_0_0"]
result = node.query(
2022-03-31 01:28:07 +00:00
"""
SELECT l.name, r.value from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as l
JOIN s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as r
ON l.name = r.name
"""
)
assert "AMBIGUOUS_COLUMN_NAME" not in result
def test_skip_unavailable_shards(started_cluster):
node = started_cluster.instances["s0_0_0"]
result = node.query(
"""
SELECT count(*) from s3Cluster(
'cluster_non_existent_port',
'http://minio1:9001/root/data/clickhouse/part1.csv',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
SETTINGS skip_unavailable_shards = 1
"""
)
assert result == "10\n"
def test_unskip_unavailable_shards(started_cluster):
node = started_cluster.instances["s0_0_0"]
error = node.query_and_get_error(
"""
SELECT count(*) from s3Cluster(
'cluster_non_existent_port',
'http://minio1:9001/root/data/clickhouse/part1.csv',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
"""
)
assert "NETWORK_ERROR" in error