ClickHouse/tests/integration/test_storage_hdfs/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

821 lines
28 KiB
Python
Raw Normal View History

import os
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
2021-11-10 14:21:25 +00:00
from pyhdfs import HdfsClient
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
2022-08-11 10:55:18 +00:00
"node1",
main_configs=["configs/macro.xml", "configs/schema_cache.xml", "configs/cluster.xml"],
2022-08-11 10:55:18 +00:00
with_hdfs=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_read_write_storage(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
2021-07-12 08:32:20 +00:00
node1.query("drop table if exists SimpleHDFSStorage SYNC")
node1.query(
"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')"
)
2019-09-05 14:42:17 +00:00
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
def test_read_write_storage_with_globs(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1..5}', 'TSV')"
)
node1.query(
"create table HDFSStorageWithEnum (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1,2,3,4,5}', 'TSV')"
)
node1.query(
"create table HDFSStorageWithQuestionMark (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage?', 'TSV')"
)
node1.query(
"create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage*', 'TSV')"
)
2019-09-05 14:42:17 +00:00
2019-09-20 11:26:00 +00:00
for i in ["1", "2", "3"]:
2021-02-19 12:58:11 +00:00
hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
2019-09-20 11:26:00 +00:00
assert node1.query("select count(*) from HDFSStorageWithRange") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithEnum") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithQuestionMark") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithAsterisk") == "3\n"
try:
node1.query("insert into HDFSStorageWithEnum values (1, 'NEW', 4.2)")
assert False, "Exception have to be thrown"
except Exception as ex:
2020-10-02 16:54:07 +00:00
print(ex)
2019-09-20 11:26:00 +00:00
assert "in readonly mode" in str(ex)
try:
node1.query("insert into HDFSStorageWithQuestionMark values (1, 'NEW', 4.2)")
assert False, "Exception have to be thrown"
except Exception as ex:
2020-10-02 16:54:07 +00:00
print(ex)
2019-09-20 11:26:00 +00:00
assert "in readonly mode" in str(ex)
try:
node1.query("insert into HDFSStorageWithAsterisk values (1, 'NEW', 4.2)")
assert False, "Exception have to be thrown"
except Exception as ex:
2020-10-02 16:54:07 +00:00
print(ex)
2019-09-20 11:26:00 +00:00
assert "in readonly mode" in str(ex)
2019-09-05 14:42:17 +00:00
def test_read_write_table(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
2021-02-19 12:58:11 +00:00
hdfs_api.write_data("/simple_table_function", data)
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_data("/simple_table_function") == data
assert (
node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')"
)
== data
)
def test_write_table(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')"
)
node1.query(
"insert into OtherHDFSStorage values (10, 'tomas', 55.55), (11, 'jack', 32.54)"
)
result = "10\ttomas\t55.55\n11\tjack\t32.54\n"
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_data("/other_storage") == result
assert node1.query("select * from OtherHDFSStorage order by id") == result
2019-01-19 20:17:19 +00:00
def test_bad_hdfs_uri(started_cluster):
2019-01-19 20:17:19 +00:00
try:
node1.query(
"create table BadStorage1 (id UInt32, name String, weight Float64) ENGINE = HDFS('hads:hgsdfs100500:9000/other_storage', 'TSV')"
)
2019-01-19 20:17:19 +00:00
except Exception as ex:
2020-10-02 16:54:07 +00:00
print(ex)
2021-11-08 11:12:06 +00:00
assert "Bad hdfs url" in str(ex)
2019-01-19 20:17:19 +00:00
try:
node1.query(
"create table BadStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs100500:9000/other_storage', 'TSV')"
)
2019-01-19 20:17:19 +00:00
except Exception as ex:
2020-10-02 16:54:07 +00:00
print(ex)
2019-09-20 11:26:00 +00:00
assert "Unable to create builder to connect to HDFS" in str(ex)
2019-01-19 20:17:19 +00:00
try:
node1.query(
"create table BadStorage3 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/<>', 'TSV')"
)
2019-01-19 20:17:19 +00:00
except Exception as ex:
2020-10-02 16:54:07 +00:00
print(ex)
2019-09-20 11:26:00 +00:00
assert "Unable to open HDFS file" in str(ex)
@pytest.mark.timeout(800)
def test_globs_in_read_table(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
some_data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
2019-08-09 17:25:29 +00:00
globs_dir = "/dir_for_test_with_globs/"
files = [
"dir1/dir_dir/file1",
"dir2/file2",
"simple_table_function",
"dir/file",
"some_dir/dir1/file",
"some_dir/dir2/file",
"some_dir/file",
"table1_function",
"table2_function",
"table3_function",
]
2019-08-09 17:25:29 +00:00
for filename in files:
2021-02-19 12:58:11 +00:00
hdfs_api.write_data(globs_dir + filename, some_data)
2019-08-09 17:25:29 +00:00
test_requests = [
("dir{1..5}/dir_dir/file1", 1, 1),
("*_table_functio?", 1, 1),
("dir/fil?", 1, 1),
("table{3..8}_function", 1, 1),
("table{2..8}_function", 2, 2),
("dir/*", 1, 1),
("dir/*?*?*?*?*", 1, 1),
("dir/*?*?*?*?*?*", 0, 0),
("some_dir/*/file", 2, 1),
("some_dir/dir?/*", 2, 1),
("*/*/*", 3, 2),
("?", 0, 0),
]
for pattern, paths_amount, files_amount in test_requests:
inside_table_func = (
"'hdfs://hdfs1:9000"
+ globs_dir
+ pattern
+ "', 'TSV', 'id UInt64, text String, number Float64'"
)
print("inside_table_func ", inside_table_func)
assert (
node1.query("select * from hdfs(" + inside_table_func + ")")
== paths_amount * some_data
)
assert node1.query(
"select count(distinct _path) from hdfs(" + inside_table_func + ")"
).rstrip() == str(paths_amount)
assert node1.query(
"select count(distinct _file) from hdfs(" + inside_table_func + ")"
).rstrip() == str(files_amount)
def test_read_write_gzip_table(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
2021-02-19 12:58:11 +00:00
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert (
node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64')"
)
== data
)
def test_read_write_gzip_table_with_parameter_gzip(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
2021-02-19 12:58:11 +00:00
hdfs_api.write_gzip_data("/simple_table_function", data)
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_gzip_data("/simple_table_function") == data
assert (
node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64', 'gzip')"
)
== data
)
def test_read_write_table_with_parameter_none(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
2021-02-19 12:58:11 +00:00
hdfs_api.write_data("/simple_table_function.gz", data)
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_data("/simple_table_function.gz") == data
assert (
node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'none')"
)
== data
)
def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
2021-02-19 12:58:11 +00:00
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert (
node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'auto')"
)
== data
)
def test_write_gz_storage(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage.gz', 'TSV')"
)
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
def test_write_gzip_storage(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/gzip_storage', 'TSV', 'gzip')"
)
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")
2021-02-19 12:58:11 +00:00
assert hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
2020-09-09 12:13:20 +00:00
2021-04-20 08:38:14 +00:00
def test_virtual_columns(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table virtual_cols (id UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/file*', 'TSV')"
)
2021-04-27 17:20:13 +00:00
hdfs_api.write_data("/file1", "1\n")
hdfs_api.write_data("/file2", "2\n")
hdfs_api.write_data("/file3", "3\n")
2021-04-20 08:38:14 +00:00
expected = "1\tfile1\thdfs://hdfs1:9000//file1\n2\tfile2\thdfs://hdfs1:9000//file2\n3\tfile3\thdfs://hdfs1:9000//file3\n"
assert (
node1.query(
"select id, _file as file_name, _path as file_path from virtual_cols order by id"
)
== expected
)
2021-04-20 08:38:14 +00:00
2021-06-21 13:50:09 +00:00
def test_read_files_with_spaces(started_cluster):
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster.hdfs_api
2021-11-10 14:20:25 +00:00
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
dir = "/test_spaces"
2021-11-10 14:20:25 +00:00
exists = fs.exists(dir)
if exists:
fs.delete(dir, recursive=True)
fs.mkdirs(dir)
hdfs_api.write_data(f"{dir}/test test test 1.txt", "1\n")
hdfs_api.write_data(f"{dir}/test test test 2.txt", "2\n")
hdfs_api.write_data(f"{dir}/test test test 3.txt", "3\n")
node1.query(
f"create table test (id UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{dir}/test*', 'TSV')"
)
2021-04-19 20:39:22 +00:00
assert node1.query("select * from test order by id") == "1\n2\n3\n"
2021-11-10 14:20:25 +00:00
fs.delete(dir, recursive=True)
2021-04-19 20:39:22 +00:00
def test_truncate_table(started_cluster):
2021-06-21 13:50:09 +00:00
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table test_truncate (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/tr', 'TSV')"
)
2021-06-21 13:50:09 +00:00
node1.query("insert into test_truncate values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/tr") == "1\tMark\t72.53\n"
assert node1.query("select * from test_truncate") == "1\tMark\t72.53\n"
node1.query("truncate table test_truncate")
assert node1.query("select * from test_truncate") == ""
node1.query("drop table test_truncate")
def test_partition_by(started_cluster):
2021-10-25 16:23:44 +00:00
hdfs_api = started_cluster.hdfs_api
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
file_name = "test_{_partition_id}"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (1, 3, 2)"
table_function = f"hdfs('hdfs://hdfs1:9000/{file_name}', 'TSV', '{table_format}')"
node1.query(
f"insert into table function {table_function} PARTITION BY {partition_by} values {values}"
)
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_1', 'TSV', '{table_format}')"
)
assert result.strip() == "3\t2\t1"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_2', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t3\t2"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_3', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t2\t3"
2021-10-25 16:23:44 +00:00
2021-10-26 12:22:13 +00:00
file_name = "test2_{_partition_id}"
node1.query(
f"create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS('hdfs://hdfs1:9000/{file_name}', 'TSV') partition by column3"
)
2021-10-26 12:22:13 +00:00
node1.query(f"insert into p values {values}")
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_1', 'TSV', '{table_format}')"
)
assert result.strip() == "3\t2\t1"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_2', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t3\t2"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_3', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t2\t3"
2021-10-26 12:22:13 +00:00
2021-10-25 16:23:44 +00:00
def test_seekable_formats(started_cluster):
2021-10-31 19:53:24 +00:00
hdfs_api = started_cluster.hdfs_api
table_function = (
f"hdfs('hdfs://hdfs1:9000/parquet', 'Parquet', 'a Int32, b String')"
)
node1.query(
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)"
)
2021-10-31 19:53:24 +00:00
result = node1.query(f"SELECT count() FROM {table_function}")
assert int(result) == 5000000
2021-10-31 19:53:24 +00:00
table_function = f"hdfs('hdfs://hdfs1:9000/orc', 'ORC', 'a Int32, b String')"
node1.query(
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)"
)
2021-10-31 19:53:24 +00:00
result = node1.query(f"SELECT count() FROM {table_function}")
assert int(result) == 5000000
2021-10-31 19:53:24 +00:00
2021-12-17 15:34:13 +00:00
def test_read_table_with_default(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "n\n100\n"
hdfs_api.write_data("/simple_table_function", data)
assert hdfs_api.read_data("/simple_table_function") == data
output = "n\tm\n100\t200\n"
assert (
node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSVWithNames', 'n UInt32, m UInt32 DEFAULT n * 2') FORMAT TSVWithNames"
)
== output
)
def test_schema_inference(started_cluster):
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/native', 'Native', 'a Int32, b String') SELECT number, randomString(100) FROM numbers(5000000)"
)
2021-12-17 15:34:13 +00:00
2021-12-20 21:00:40 +00:00
result = node1.query(f"desc hdfs('hdfs://hdfs1:9000/native', 'Native')")
2021-12-17 15:34:13 +00:00
assert result == "a\tInt32\t\t\t\t\t\nb\tString\t\t\t\t\t\n"
result = node1.query(
f"select count(*) from hdfs('hdfs://hdfs1:9000/native', 'Native')"
)
assert int(result) == 5000000
2021-12-17 15:34:13 +00:00
node1.query(
f"create table schema_inference engine=HDFS('hdfs://hdfs1:9000/native', 'Native')"
)
2021-12-17 15:34:13 +00:00
result = node1.query(f"desc schema_inference")
assert result == "a\tInt32\t\t\t\t\t\nb\tString\t\t\t\t\t\n"
result = node1.query(f"select count(*) from schema_inference")
assert int(result) == 5000000
2021-12-17 15:34:13 +00:00
2021-10-31 19:53:24 +00:00
def test_hdfsCluster(started_cluster):
2021-12-03 05:25:14 +00:00
hdfs_api = started_cluster.hdfs_api
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
dir = "/test_hdfsCluster"
2021-12-03 05:25:14 +00:00
exists = fs.exists(dir)
if exists:
fs.delete(dir, recursive=True)
fs.mkdirs(dir)
hdfs_api.write_data("/test_hdfsCluster/file1", "1\n")
hdfs_api.write_data("/test_hdfsCluster/file2", "2\n")
hdfs_api.write_data("/test_hdfsCluster/file3", "3\n")
actual = node1.query(
"select id, _file as file_name, _path as file_path from hdfs('hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id"
)
2021-12-03 05:25:14 +00:00
expected = "1\tfile1\thdfs://hdfs1:9000/test_hdfsCluster/file1\n2\tfile2\thdfs://hdfs1:9000/test_hdfsCluster/file2\n3\tfile3\thdfs://hdfs1:9000/test_hdfsCluster/file3\n"
assert actual == expected
actual = node1.query(
"select id, _file as file_name, _path as file_path from hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') order by id"
)
2021-12-03 05:25:14 +00:00
expected = "1\tfile1\thdfs://hdfs1:9000/test_hdfsCluster/file1\n2\tfile2\thdfs://hdfs1:9000/test_hdfsCluster/file2\n3\tfile3\thdfs://hdfs1:9000/test_hdfsCluster/file3\n"
assert actual == expected
fs.delete(dir, recursive=True)
def test_hdfs_directory_not_exist(started_cluster):
ddl = "create table HDFSStorageWithNotExistDir (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/data/not_eixst', 'TSV')"
2022-01-18 05:55:41 +00:00
node1.query(ddl)
assert "" == node1.query("select * from HDFSStorageWithNotExistDir")
2021-12-03 05:25:14 +00:00
def test_overwrite(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')"
2022-01-18 19:26:13 +00:00
node1.query(f"create table test_overwrite as {table_function}")
node1.query(
f"insert into test_overwrite select number, randomString(100) from numbers(5)"
)
node1.query_and_get_error(
f"insert into test_overwrite select number, randomString(100) FROM numbers(10)"
)
node1.query(
f"insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1"
)
2022-01-18 19:26:13 +00:00
result = node1.query(f"select count() from test_overwrite")
assert int(result) == 10
def test_multiple_inserts(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)"
)
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1"
)
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1"
)
result = node1.query(f"select count() from test_multiple_inserts")
assert int(result) == 60
result = node1.query(f"drop table test_multiple_inserts")
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(10)"
)
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1"
)
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1"
)
result = node1.query(f"select count() from test_multiple_inserts")
assert int(result) == 60
def test_format_detection(started_cluster):
node1.query(
f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')"
)
2022-01-14 11:00:50 +00:00
node1.query(f"insert into arrow_table select 1")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/data.arrow')")
assert int(result) == 1
2022-01-14 11:00:50 +00:00
def test_schema_inference_with_globs(started_cluster):
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
)
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data2.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select 0"
)
2022-02-09 16:14:14 +00:00
result = node1.query(f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow')")
2022-07-21 16:54:42 +00:00
assert result.strip() == "c1\tNullable(Int64)"
2022-02-09 16:14:14 +00:00
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow')"
)
assert sorted(result.split()) == ["0", "\\N"]
2022-02-09 16:14:14 +00:00
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data3.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
)
filename = "data{1,3}.jsoncompacteachrow"
2022-06-29 11:10:39 +00:00
result = node1.query_and_get_error(
2022-08-15 12:33:08 +00:00
f"desc hdfs('hdfs://hdfs1:9000/{filename}') settings schema_inference_use_cache_for_hdfs=0"
2022-06-29 11:10:39 +00:00
)
assert "All attempts to extract table structure from files failed" in result
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'"
)
2022-04-20 14:34:05 +00:00
result = node1.query_and_get_error(
2022-08-15 12:33:08 +00:00
f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow') settings schema_inference_use_cache_for_hdfs=0"
)
2022-04-20 14:34:05 +00:00
assert (
"Cannot extract table structure from JSONCompactEachRow format file" in result
)
2022-02-09 16:14:14 +00:00
def test_insert_select_schema_inference(started_cluster):
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/test.native.zst') select toUInt64(1) as x"
)
result = node1.query(f"desc hdfs('hdfs://hdfs1:9000/test.native.zst')")
assert result.strip() == "x\tUInt64"
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test.native.zst')")
assert int(result) == 1
2022-04-07 10:11:49 +00:00
def test_cluster_join(started_cluster):
result = node1.query(
2022-03-31 01:28:07 +00:00
"""
SELECT l.id,r.id FROM hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') as l
JOIN hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') as r
ON l.id = r.id
2022-03-31 01:28:07 +00:00
"""
)
assert "AMBIGUOUS_COLUMN_NAME" not in result
def test_cluster_macro(started_cluster):
with_macro = node1.query(
"""
SELECT id FROM hdfsCluster('{default_cluster_macro}', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32')
"""
)
no_macro = node1.query(
"""
SELECT id FROM hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32')
"""
)
assert TSV(with_macro) == TSV(no_macro)
def test_virtual_columns_2(started_cluster):
2022-03-24 16:10:04 +00:00
hdfs_api = started_cluster.hdfs_api
2022-03-25 09:14:14 +00:00
table_function = (
2022-03-30 15:34:07 +00:00
f"hdfs('hdfs://hdfs1:9000/parquet_2', 'Parquet', 'a Int32, b String')"
2022-03-25 09:14:14 +00:00
)
2022-03-24 16:10:04 +00:00
node1.query(f"insert into table function {table_function} SELECT 1, 'kek'")
result = node1.query(f"SELECT _path FROM {table_function}")
2022-03-31 09:13:38 +00:00
assert result.strip() == "hdfs://hdfs1:9000/parquet_2"
table_function = (
f"hdfs('hdfs://hdfs1:9000/parquet_3', 'Parquet', 'a Int32, _path String')"
)
node1.query(f"insert into table function {table_function} SELECT 1, 'kek'")
result = node1.query(f"SELECT _path FROM {table_function}")
assert result.strip() == "kek"
2022-03-24 16:10:04 +00:00
def get_profile_event_for_query(node, query, profile_event):
node.query("system flush logs")
query = query.replace("'", "\\'")
return int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
def check_cache_misses(node1, file, amount=1):
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query(
node1,
f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheMisses",
)
== amount
)
def check_cache_hits(node1, file, amount=1):
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query(
node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')", "SchemaInferenceCacheHits"
)
== amount
)
def check_cache_invalidations(node1, file, amount=1):
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query(
node1,
f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheInvalidations",
)
== amount
)
def check_cache_evictions(node1, file, amount=1):
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query(
node1,
f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheEvictions",
)
== amount
)
def check_cache(node1, expected_files):
sources = node1.query("select source from system.schema_inference_cache")
2022-08-11 10:55:18 +00:00
assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(
expected_files
)
def run_describe_query(node, file):
query = f"desc hdfs('hdfs://hdfs1:9000/{file}')"
node.query(query)
def test_schema_inference_cache(started_cluster):
2022-08-16 10:39:35 +00:00
node1.query("system drop schema cache")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
2022-06-28 16:13:42 +00:00
time.sleep(1)
run_describe_query(node1, "test_cache0.jsonl")
check_cache(node1, ["test_cache0.jsonl"])
check_cache_misses(node1, "test_cache0.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache_hits(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
2022-06-28 16:13:42 +00:00
time.sleep(1)
run_describe_query(node1, "test_cache0.jsonl")
check_cache_invalidations(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(node1, "test_cache1.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(node1, "test_cache1.jsonl")
run_describe_query(node1, "test_cache1.jsonl")
check_cache_hits(node1, "test_cache1.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(node1, "test_cache2.jsonl")
check_cache(node1, ["test_cache1.jsonl", "test_cache2.jsonl"])
check_cache_misses(node1, "test_cache2.jsonl")
check_cache_evictions(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache_hits(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache1.jsonl")
check_cache_hits(node1, "test_cache1.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(node1, "test_cache0.jsonl")
check_cache_evictions(node1, "test_cache0.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache2.jsonl"])
check_cache_misses(node1, "test_cache2.jsonl")
check_cache_evictions(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache_hits(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache_hits(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
2022-06-28 16:13:42 +00:00
time.sleep(1)
files = "test_cache{0,1,2,3}.jsonl"
run_describe_query(node1, files)
check_cache_hits(node1, files)
node1.query(f"system drop schema cache for hdfs")
check_cache(node1, [])
run_describe_query(node1, files)
check_cache_misses(node1, files, 4)
node1.query("system drop schema cache")
check_cache(node1, [])
run_describe_query(node1, files)
check_cache_misses(node1, files, 4)
def test_test_hdfsCluster_skip_unavailable_shards(started_cluster):
node = started_cluster.instances["node1"]
result = node.query(
"""
SELECT count(*) FROM hdfsCluster(
'cluster_non_existent_port',
'hdfs://hdfs1:9000/test_hdfsCluster/file*',
'TSV',
'id UInt32')
SETTINGS skip_unavailable_shards = 1
"""
)
assert result == "3\n"
def test_test_hdfsCluster_unskip_unavailable_shards(started_cluster):
node = started_cluster.instances["node1"]
error = node.query_and_get_error(
"""
SELECT count(*) FROM hdfsCluster(
'cluster_non_existent_port',
'hdfs://hdfs1:9000/test_hdfsCluster/file*',
'TSV',
'id UInt32')
"""
)
assert "NETWORK_ERROR" in error
if __name__ == "__main__":
2020-09-09 12:13:20 +00:00
cluster.start()
input("Cluster created, press any key to destroy...")
2020-09-09 12:13:20 +00:00
cluster.shutdown()