mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
5153798aeb
Added fileCluster function Added test and docs
126 lines
3.9 KiB
Python
126 lines
3.9 KiB
Python
import logging
|
|
import csv
|
|
import time
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import TSV
|
|
|
|
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
logging.getLogger().addHandler(logging.StreamHandler())
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance(
|
|
"s0_0_0",
|
|
main_configs=["configs/cluster.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
macros={"replica": "node1", "shard": "shard1"},
|
|
with_zookeeper=True,
|
|
)
|
|
cluster.add_instance(
|
|
"s0_0_1",
|
|
main_configs=["configs/cluster.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
macros={"replica": "replica2", "shard": "shard1"},
|
|
with_zookeeper=True,
|
|
)
|
|
cluster.add_instance(
|
|
"s0_1_0",
|
|
main_configs=["configs/cluster.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
macros={"replica": "replica1", "shard": "shard2"},
|
|
with_zookeeper=True,
|
|
)
|
|
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
for node_name in ("s0_0_0", "s0_0_1", "s0_1_0"):
|
|
for i in range(1, 3):
|
|
cluster.instances[node_name].query(
|
|
f"""
|
|
INSERT INTO TABLE FUNCTION file(
|
|
'file{i}.csv', 'CSV', 's String, i UInt32') VALUES ('file{i}',{i})
|
|
"""
|
|
)
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def get_query(select: str, cluster: bool, files_nums: str, order_by="ORDER BY (i, s)"):
|
|
if cluster:
|
|
return f"SELECT {select} from fileCluster('my_cluster', 'file{{{files_nums}}}.csv', 'CSV', 's String, i UInt32') {order_by}"
|
|
else:
|
|
return f"SELECT {select} from file('file{{{files_nums}}}.csv', 'CSV', 's String, i UInt32') {order_by}"
|
|
|
|
|
|
def test_select_all(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
|
|
local = node.query(get_query("*", False, "1,2"))
|
|
distributed = node.query(get_query("*", True, "1,2"))
|
|
|
|
assert TSV(local) == TSV(distributed)
|
|
|
|
|
|
def test_count(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
|
|
local = node.query(get_query("count(*)", False, "1,2", ""))
|
|
distributed = node.query(get_query("count(*)", True, "1,2", ""))
|
|
|
|
assert TSV(local) == TSV(distributed)
|
|
|
|
|
|
def test_non_existent_cluster(started_cluster):
|
|
node = started_cluster.instances["s0_0_0"]
|
|
error = node.query_and_get_error(
|
|
"""
|
|
SELECT count(*) from fileCluster(
|
|
'non_existent_cluster', 'file{1,2}.csv', 'CSV', 's String, i UInt32')
|
|
UNION ALL
|
|
SELECT count(*) from fileCluster(
|
|
'non_existent_cluster', 'file{1,2}.csv', 'CSV', 's String, i UInt32')
|
|
"""
|
|
)
|
|
|
|
assert "not found" in error
|
|
|
|
|
|
def test_missing_file(started_cluster):
|
|
"""
|
|
Select from a list of files, _some_ of them don't exist
|
|
"""
|
|
node = started_cluster.instances["s0_0_0"]
|
|
|
|
local_with_missing_file = node.query(get_query("*", False, "1,2,3"))
|
|
local_wo_missing_file = node.query(get_query("*", False, "1,2"))
|
|
|
|
distributed_with_missing_file = node.query(get_query("*", True, "1,2,3"))
|
|
distributed_wo_missing_file = node.query(get_query("*", True, "1,2"))
|
|
|
|
assert TSV(local_with_missing_file) == TSV(distributed_with_missing_file)
|
|
assert TSV(local_wo_missing_file) == TSV(distributed_wo_missing_file)
|
|
assert TSV(local_with_missing_file) == TSV(distributed_wo_missing_file)
|
|
assert TSV(local_wo_missing_file) == TSV(distributed_with_missing_file)
|
|
|
|
|
|
def test_no_such_files(started_cluster):
|
|
"""
|
|
Select from a list of files, _none_ of them don't exist
|
|
"""
|
|
node = started_cluster.instances["s0_0_0"]
|
|
|
|
local = node.query(get_query("*", False, "3,4"))
|
|
distributed = node.query(get_query("*", True, "3,4"))
|
|
|
|
assert TSV(local) == TSV(distributed)
|