ClickHouse/tests/integration/test_file_cluster/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

126 lines
3.9 KiB
Python
Raw Normal View History

import logging
import csv
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"s0_0_0",
main_configs=["configs/cluster.xml"],
user_configs=["configs/users.xml"],
macros={"replica": "node1", "shard": "shard1"},
with_zookeeper=True,
)
cluster.add_instance(
"s0_0_1",
main_configs=["configs/cluster.xml"],
user_configs=["configs/users.xml"],
macros={"replica": "replica2", "shard": "shard1"},
with_zookeeper=True,
)
cluster.add_instance(
"s0_1_0",
main_configs=["configs/cluster.xml"],
user_configs=["configs/users.xml"],
macros={"replica": "replica1", "shard": "shard2"},
with_zookeeper=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
for node_name in ("s0_0_0", "s0_0_1", "s0_1_0"):
for i in range(1, 3):
cluster.instances[node_name].query(
f"""
INSERT INTO TABLE FUNCTION file(
'file{i}.csv', 'CSV', 's String, i UInt32') VALUES ('file{i}',{i})
"""
)
yield cluster
finally:
cluster.shutdown()
def get_query(select: str, cluster: bool, files_nums: str, order_by="ORDER BY (i, s)"):
if cluster:
return f"SELECT {select} from fileCluster('my_cluster', 'file{{{files_nums}}}.csv', 'CSV', 's String, i UInt32') {order_by}"
else:
return f"SELECT {select} from file('file{{{files_nums}}}.csv', 'CSV', 's String, i UInt32') {order_by}"
def test_select_all(started_cluster):
node = started_cluster.instances["s0_0_0"]
local = node.query(get_query("*", False, "1,2"))
distributed = node.query(get_query("*", True, "1,2"))
assert TSV(local) == TSV(distributed)
def test_count(started_cluster):
node = started_cluster.instances["s0_0_0"]
local = node.query(get_query("count(*)", False, "1,2", ""))
distributed = node.query(get_query("count(*)", True, "1,2", ""))
assert TSV(local) == TSV(distributed)
def test_non_existent_cluster(started_cluster):
node = started_cluster.instances["s0_0_0"]
error = node.query_and_get_error(
"""
SELECT count(*) from fileCluster(
'non_existent_cluster', 'file{1,2}.csv', 'CSV', 's String, i UInt32')
UNION ALL
SELECT count(*) from fileCluster(
'non_existent_cluster', 'file{1,2}.csv', 'CSV', 's String, i UInt32')
"""
)
assert "not found" in error
def test_missing_file(started_cluster):
"""
Select from a list of files, _some_ of them don't exist
"""
node = started_cluster.instances["s0_0_0"]
local_with_missing_file = node.query(get_query("*", False, "1,2,3"))
local_wo_missing_file = node.query(get_query("*", False, "1,2"))
distributed_with_missing_file = node.query(get_query("*", True, "1,2,3"))
distributed_wo_missing_file = node.query(get_query("*", True, "1,2"))
assert TSV(local_with_missing_file) == TSV(distributed_with_missing_file)
assert TSV(local_wo_missing_file) == TSV(distributed_wo_missing_file)
assert TSV(local_with_missing_file) == TSV(distributed_wo_missing_file)
assert TSV(local_wo_missing_file) == TSV(distributed_with_missing_file)
def test_no_such_files(started_cluster):
"""
Select from a list of files, _none_ of them don't exist
"""
node = started_cluster.instances["s0_0_0"]
local = node.query(get_query("*", False, "3,4"))
distributed = node.query(get_query("*", True, "3,4"))
assert TSV(local) == TSV(distributed)