ClickHouse/tests/integration/test_disks_app_func/test.py

191 lines
4.0 KiB
Python
Raw Normal View History

from helpers.cluster import ClickHouseCluster
import pytest
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"disks_app_test",
main_configs=["config.xml"],
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def init_data(source):
source.query("DROP TABLE IF EXISTS test_table")
source.query(
"CREATE TABLE test_table(word String, value UInt64) "
"ENGINE=MergeTree() "
"ORDER BY word SETTINGS storage_policy = 'test1'"
)
source.query("INSERT INTO test_table(*) VALUES ('test1', 2)")
def test_disks_app_func_ld(started_cluster):
source = cluster.instances["disks_app_test"]
out = source.exec_in_container(["/usr/bin/clickhouse", "disks", "list-disks"])
disks = out.split("\n")
assert disks[0] == "default" and disks[1] == "test1" and disks[2] == "test2"
def test_disks_app_func_ls(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test1", "list", "."]
)
files = out.split("\n")
assert files[0] == "store"
def test_disks_app_func_cp(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"copy",
"--diskFrom",
"test1",
"--diskTo",
"test2",
".",
".",
]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test2", "list", "."]
)
files = out.split("\n")
assert files[0] == "path1"
def test_disks_app_func_ln(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"link",
"data/default/test_table",
"data/default/z_tester",
]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "list", "data/default/"]
)
files = out.split("\n")
assert "z_tester" in files
def test_disks_app_func_rm(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"copy",
"--diskFrom",
"test1",
"--diskTo",
"test2",
".",
".",
]
)
source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test2", "remove", "path1"]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test2", "list", "."]
)
files = out.split("\n")
assert files[0] == ""
def test_disks_app_func_mv(started_cluster):
source = cluster.instances["disks_app_test"]
init_data(source)
source.exec_in_container(
[
"/usr/bin/clickhouse",
"disks",
"--disk",
"test1",
"move",
"store",
"old_store",
]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test1", "list", "."]
)
files = out.split("\n")
assert files[0] == "old_store"
def test_disks_app_func_read_write(started_cluster):
source = cluster.instances["disks_app_test"]
source.exec_in_container(
[
"bash",
"-c",
"echo 'tester' |"
+ " ".join(
["/usr/bin/clickhouse", "disks", "--disk", "test1", "write", "5.txt"]
),
]
)
out = source.exec_in_container(
["/usr/bin/clickhouse", "disks", "--disk", "test1", "read", "5.txt"]
)
files = out.split("\n")
assert files[0] == "tester"