mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 18:32:29 +00:00
430 lines
14 KiB
Python
430 lines
14 KiB
Python
from email.errors import HeaderParseError
|
|
import logging
|
|
import os
|
|
import csv
|
|
import shutil
|
|
import time
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import TSV
|
|
from helpers.mock_servers import start_mock_servers
|
|
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
logging.getLogger().addHandler(logging.StreamHandler())
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
|
S3_DATA = [
|
|
"data/clickhouse/part1.csv",
|
|
"data/clickhouse/part123.csv",
|
|
"data/database/part2.csv",
|
|
"data/database/partition675.csv",
|
|
]
|
|
|
|
|
|
def create_buckets_s3(cluster):
|
|
minio = cluster.minio_client
|
|
|
|
for file_number in range(100):
|
|
file_name = f"data/generated/file_{file_number}.csv"
|
|
os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True)
|
|
S3_DATA.append(file_name)
|
|
with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f:
|
|
# a String, b UInt64
|
|
data = []
|
|
|
|
# Make all files a bit different
|
|
for number in range(100 + file_number):
|
|
data.append([str(number + file_number) * 10, number + file_number])
|
|
|
|
writer = csv.writer(f)
|
|
writer.writerows(data)
|
|
|
|
for file in S3_DATA:
|
|
minio.fput_object(
|
|
bucket_name=cluster.minio_bucket,
|
|
object_name=file,
|
|
file_path=os.path.join(SCRIPT_DIR, file),
|
|
)
|
|
for obj in minio.list_objects(cluster.minio_bucket, recursive=True):
|
|
print(obj.object_name)
|
|
|
|
|
|
def run_s3_mocks(started_cluster):
|
|
script_dir = os.path.join(os.path.dirname(__file__), "s3_mocks")
|
|
start_mock_servers(
|
|
started_cluster,
|
|
script_dir,
|
|
[
|
|
("s3_mock.py", "resolver", "8080"),
|
|
],
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance(
|
|
"s0_0_0",
|
|
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
macros={"replica": "node1", "shard": "shard1"},
|
|
with_minio=True,
|
|
with_zookeeper=True,
|
|
)
|
|
cluster.add_instance(
|
|
"s0_0_1",
|
|
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
macros={"replica": "replica2", "shard": "shard1"},
|
|
with_zookeeper=True,
|
|
)
|
|
cluster.add_instance(
|
|
"s0_1_0",
|
|
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
macros={"replica": "replica1", "shard": "shard2"},
|
|
with_zookeeper=True,
|
|
)
|
|
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
create_buckets_s3(cluster)
|
|
|
|
run_s3_mocks(cluster)
|
|
|
|
yield cluster
|
|
finally:
|
|
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"))
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_select_all(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
pure_s3 = node.query(
|
|
"""
|
|
SELECT * from s3(
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
ORDER BY (name, value, polygon)"""
|
|
)
|
|
# print(pure_s3)
|
|
s3_distributed = node.query(
|
|
"""
|
|
SELECT * from s3Cluster(
|
|
'cluster_simple',
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)"""
|
|
)
|
|
# print(s3_distributed)
|
|
|
|
assert TSV(pure_s3) == TSV(s3_distributed)
|
|
|
|
|
|
def test_count(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
pure_s3 = node.query(
|
|
"""
|
|
SELECT count(*) from s3(
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
|
|
)
|
|
# print(pure_s3)
|
|
s3_distributed = node.query(
|
|
"""
|
|
SELECT count(*) from s3Cluster(
|
|
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
|
|
)
|
|
# print(s3_distributed)
|
|
|
|
assert TSV(pure_s3) == TSV(s3_distributed)
|
|
|
|
|
|
def test_count_macro(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
|
|
s3_macro = node.query(
|
|
"""
|
|
SELECT count(*) from s3Cluster(
|
|
'{default_cluster_macro}', 'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
|
|
)
|
|
# print(s3_distributed)
|
|
s3_distributed = node.query(
|
|
"""
|
|
SELECT count(*) from s3Cluster(
|
|
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
|
|
)
|
|
# print(s3_distributed)
|
|
|
|
assert TSV(s3_macro) == TSV(s3_distributed)
|
|
|
|
|
|
def test_union_all(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
pure_s3 = node.query(
|
|
"""
|
|
SELECT * FROM
|
|
(
|
|
SELECT * from s3(
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
UNION ALL
|
|
SELECT * from s3(
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
)
|
|
ORDER BY (name, value, polygon)
|
|
"""
|
|
)
|
|
# print(pure_s3)
|
|
s3_distributed = node.query(
|
|
"""
|
|
SELECT * FROM
|
|
(
|
|
SELECT * from s3Cluster(
|
|
'cluster_simple',
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
UNION ALL
|
|
SELECT * from s3Cluster(
|
|
'cluster_simple',
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
)
|
|
ORDER BY (name, value, polygon)
|
|
"""
|
|
)
|
|
# print(s3_distributed)
|
|
|
|
assert TSV(pure_s3) == TSV(s3_distributed)
|
|
|
|
|
|
def test_wrong_cluster(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
error = node.query_and_get_error(
|
|
"""
|
|
SELECT count(*) from s3Cluster(
|
|
'non_existent_cluster',
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
UNION ALL
|
|
SELECT count(*) from s3Cluster(
|
|
'non_existent_cluster',
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*',
|
|
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
"""
|
|
)
|
|
|
|
assert "not found" in error
|
|
|
|
|
|
def test_ambiguous_join(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
result = node.query(
|
|
"""
|
|
SELECT l.name, r.value from s3Cluster(
|
|
'cluster_simple',
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as l
|
|
JOIN s3Cluster(
|
|
'cluster_simple',
|
|
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
|
|
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as r
|
|
ON l.name = r.name
|
|
"""
|
|
)
|
|
assert "AMBIGUOUS_COLUMN_NAME" not in result
|
|
|
|
|
|
def test_skip_unavailable_shards(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
result = node.query(
|
|
"""
|
|
SELECT count(*) from s3Cluster(
|
|
'cluster_non_existent_port',
|
|
'http://minio1:9001/root/data/clickhouse/part1.csv',
|
|
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
SETTINGS skip_unavailable_shards = 1
|
|
"""
|
|
)
|
|
|
|
assert result == "10\n"
|
|
|
|
|
|
def test_unset_skip_unavailable_shards(started_cluster):
|
|
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
|
|
node = started_cluster.instances["s0_0_0"]
|
|
result = node.query(
|
|
"""
|
|
SELECT count(*) from s3Cluster(
|
|
'cluster_non_existent_port',
|
|
'http://minio1:9001/root/data/clickhouse/part1.csv',
|
|
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
|
|
"""
|
|
)
|
|
|
|
assert result == "10\n"
|
|
|
|
|
|
def test_distributed_insert_select_with_replicated(started_cluster):
|
|
first_replica_first_shard = started_cluster.instances["s0_0_0"]
|
|
second_replica_first_shard = started_cluster.instances["s0_0_1"]
|
|
|
|
first_replica_first_shard.query(
|
|
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard' SYNC;"""
|
|
)
|
|
|
|
first_replica_first_shard.query(
|
|
"""
|
|
CREATE TABLE insert_select_replicated_local ON CLUSTER 'first_shard' (a String, b UInt64)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}')
|
|
ORDER BY (a, b);
|
|
"""
|
|
)
|
|
|
|
for replica in [first_replica_first_shard, second_replica_first_shard]:
|
|
replica.query(
|
|
"""
|
|
SYSTEM STOP FETCHES;
|
|
"""
|
|
)
|
|
replica.query(
|
|
"""
|
|
SYSTEM STOP MERGES;
|
|
"""
|
|
)
|
|
|
|
first_replica_first_shard.query(
|
|
"""
|
|
INSERT INTO insert_select_replicated_local SELECT * FROM s3Cluster(
|
|
'first_shard',
|
|
'http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
|
|
) SETTINGS parallel_distributed_insert_select=1;
|
|
"""
|
|
)
|
|
|
|
for replica in [first_replica_first_shard, second_replica_first_shard]:
|
|
replica.query(
|
|
"""
|
|
SYSTEM FLUSH LOGS;
|
|
"""
|
|
)
|
|
|
|
assert (
|
|
int(
|
|
second_replica_first_shard.query(
|
|
"""SELECT count(*) FROM system.query_log WHERE not is_initial_query and query ilike '%s3Cluster%';"""
|
|
).strip()
|
|
)
|
|
!= 0
|
|
)
|
|
|
|
# Check whether we inserted at least something
|
|
assert (
|
|
int(
|
|
second_replica_first_shard.query(
|
|
"""SELECT count(*) FROM insert_select_replicated_local;"""
|
|
).strip()
|
|
)
|
|
!= 0
|
|
)
|
|
|
|
first_replica_first_shard.query(
|
|
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard' SYNC;"""
|
|
)
|
|
|
|
|
|
def test_parallel_distributed_insert_select_with_schema_inference(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
|
|
node.query(
|
|
"""DROP TABLE IF EXISTS parallel_insert_select ON CLUSTER 'first_shard' SYNC;"""
|
|
)
|
|
|
|
node.query(
|
|
"""
|
|
CREATE TABLE parallel_insert_select ON CLUSTER 'first_shard' (a String, b UInt64)
|
|
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}')
|
|
ORDER BY (a, b);
|
|
"""
|
|
)
|
|
|
|
node.query(
|
|
"""
|
|
INSERT INTO parallel_insert_select SELECT * FROM s3Cluster(
|
|
'first_shard',
|
|
'http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV'
|
|
) SETTINGS parallel_distributed_insert_select=1, use_structure_from_insertion_table_in_table_functions=0;
|
|
"""
|
|
)
|
|
|
|
node.query("SYSTEM SYNC REPLICA parallel_insert_select")
|
|
|
|
actual_count = int(
|
|
node.query(
|
|
"SELECT count() FROM s3('http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64')"
|
|
)
|
|
)
|
|
|
|
count = int(node.query("SELECT count() FROM parallel_insert_select"))
|
|
assert count == actual_count
|
|
|
|
|
|
def test_cluster_with_header(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
assert (
|
|
node.query(
|
|
"SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'))"
|
|
)
|
|
== "SomeValue\n"
|
|
)
|
|
assert (
|
|
node.query(
|
|
"SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'), 'CSV')"
|
|
)
|
|
== "SomeValue\n"
|
|
)
|
|
assert (
|
|
node.query(
|
|
"SELECT * from s3Cluster('cluster_simple', 'http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'))"
|
|
)
|
|
== "SomeValue\n"
|
|
)
|
|
assert (
|
|
node.query(
|
|
"SELECT * from s3Cluster('cluster_simple', 'http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'), 'CSV')"
|
|
)
|
|
== "SomeValue\n"
|
|
)
|
|
|
|
|
|
def test_cluster_with_named_collection(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
|
|
pure_s3 = node.query("""SELECT * from s3(test_s3) ORDER BY (c1, c2, c3)""")
|
|
|
|
s3_cluster = node.query(
|
|
"""SELECT * from s3Cluster(cluster_simple, test_s3) ORDER BY (c1, c2, c3)"""
|
|
)
|
|
|
|
assert TSV(pure_s3) == TSV(s3_cluster)
|
|
|
|
s3_cluster = node.query(
|
|
"""SELECT * from s3Cluster(cluster_simple, test_s3, structure='auto') ORDER BY (c1, c2, c3)"""
|
|
)
|
|
|
|
assert TSV(pure_s3) == TSV(s3_cluster)
|