mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
134 lines
4.5 KiB
Python
134 lines
4.5 KiB
Python
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import TSV
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
node1 = cluster.add_instance(
|
|
"node1",
|
|
main_configs=["configs/conf.xml", "configs/named_collections.xml"],
|
|
with_nginx=True,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def setup_node():
|
|
try:
|
|
cluster.start()
|
|
node1.query(
|
|
"insert into table function url(url1) partition by column3 values (1, 2, 3), (3, 2, 1), (1, 3, 2)"
|
|
)
|
|
yield
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_partition_by():
|
|
result = node1.query(
|
|
f"select * from url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
|
|
)
|
|
assert result.strip() == "3\t2\t1"
|
|
result = node1.query(
|
|
f"select * from url('http://nginx:80/test_2', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
|
|
)
|
|
assert result.strip() == "1\t3\t2"
|
|
result = node1.query(
|
|
f"select * from url('http://nginx:80/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
|
|
)
|
|
assert result.strip() == "1\t2\t3"
|
|
|
|
|
|
def test_url_cluster():
|
|
result = node1.query(
|
|
f"select * from urlCluster('test_cluster_two_shards', 'http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
|
|
)
|
|
assert result.strip() == "3\t2\t1"
|
|
result = node1.query(
|
|
f"select * from urlCluster('test_cluster_two_shards', 'http://nginx:80/test_2', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
|
|
)
|
|
assert result.strip() == "1\t3\t2"
|
|
result = node1.query(
|
|
f"select * from urlCluster('test_cluster_two_shards', 'http://nginx:80/test_3', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')"
|
|
)
|
|
assert result.strip() == "1\t2\t3"
|
|
|
|
|
|
def test_url_cluster_with_named_collection():
|
|
result = node1.query(
|
|
f"select * from urlCluster(test_cluster_one_shard_three_replicas_localhost, test_url)"
|
|
)
|
|
assert result.strip() == "3\t2\t1"
|
|
|
|
result = node1.query(
|
|
f"select * from urlCluster(test_cluster_one_shard_three_replicas_localhost, test_url, structure='auto')"
|
|
)
|
|
assert result.strip() == "3\t2\t1"
|
|
|
|
|
|
def test_table_function_url_access_rights():
|
|
node1.query("CREATE USER OR REPLACE u1")
|
|
|
|
expected_error = "necessary to have grant CREATE TEMPORARY TABLE, URL ON *.*"
|
|
assert expected_error in node1.query_and_get_error(
|
|
f"SELECT * FROM url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
|
|
user="u1",
|
|
)
|
|
|
|
expected_error = "necessary to have grant CREATE TEMPORARY TABLE, URL ON *.*"
|
|
assert expected_error in node1.query_and_get_error(
|
|
f"SELECT * FROM url('http://nginx:80/test_1', 'TSV')", user="u1"
|
|
)
|
|
|
|
assert node1.query(
|
|
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
|
|
user="u1",
|
|
) == TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
|
|
|
|
assert node1.query(
|
|
f"DESCRIBE TABLE url('http://nginx:80/not-exist', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')",
|
|
user="u1",
|
|
) == TSV([["column1", "UInt32"], ["column2", "UInt32"], ["column3", "UInt32"]])
|
|
|
|
expected_error = "necessary to have grant URL ON *.*"
|
|
assert expected_error in node1.query_and_get_error(
|
|
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV')", user="u1"
|
|
)
|
|
|
|
node1.query("GRANT URL ON *.* TO u1")
|
|
assert node1.query(
|
|
f"DESCRIBE TABLE url('http://nginx:80/test_1', 'TSV')",
|
|
user="u1",
|
|
) == TSV(
|
|
[
|
|
["c1", "Nullable(Int64)"],
|
|
["c2", "Nullable(Int64)"],
|
|
["c3", "Nullable(Int64)"],
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("file_format", ["Parquet", "CSV", "TSV", "JSONEachRow"])
|
|
def test_file_formats(file_format):
|
|
url = f"http://nginx:80/{file_format}_file"
|
|
|
|
values = ", ".join([f"({i}, {i + 1}, {i + 2})" for i in range(100)])
|
|
node1.query(
|
|
f"insert into table function url(url_file, url = '{url}', format = '{file_format}') values",
|
|
stdin=values,
|
|
)
|
|
|
|
for download_threads in [1, 4, 16]:
|
|
result = node1.query(
|
|
f"""
|
|
SELECT *
|
|
FROM url('{url}', '{file_format}')
|
|
LIMIT 10
|
|
SETTINGS remote_read_min_bytes_for_seek = 1, max_read_buffer_size = 1, max_download_buffer_size = 1, max_download_threads = {download_threads}
|
|
"""
|
|
)
|
|
|
|
expected_result = ""
|
|
for i in range(10):
|
|
expected_result += f"{i}\t{i + 1}\t{i + 2}\n"
|
|
|
|
assert result == expected_result
|