Merge branch 'qoega-patch-6' into improve-integration-tests-3

This commit is contained in:
Yatsishin Ilya 2024-07-24 20:29:01 +00:00
commit b77d1b35d5
3 changed files with 93 additions and 47 deletions

View File

@ -43,13 +43,11 @@ ENV TZ=Etc/UTC
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ENV DOCKER_CHANNEL stable
# Unpin the docker version after the release 24.0.3 is released
# https://github.com/moby/moby/issues/45770#issuecomment-1618255130
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - \
&& add-apt-repository "deb https://download.docker.com/linux/ubuntu $(lsb_release -c -s) ${DOCKER_CHANNEL}" \
&& apt-get update \
&& env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
docker-ce='5:23.*' \
docker-ce="5:27.0.3*" \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \

View File

@ -1,7 +1,7 @@
version: '2.3'
services:
hdfs1:
image: sequenceiq/hadoop-docker:2.7.0
image: prasanthj/docker-hadoop:2.6.0
hostname: hdfs1
restart: always
expose:

View File

@ -1,6 +1,7 @@
import os
import pytest
import uuid
import time
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
@ -31,13 +32,15 @@ def started_cluster():
def test_read_write_storage(started_cluster):
id = uuid.uuid4()
hdfs_api = started_cluster.hdfs_api
filename = f"simple_storage_{id}"
node1.query("drop table if exists SimpleHDFSStorage SYNC")
node1.query(
"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')"
f"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/{filename}', 'TSV')"
)
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert hdfs_api.read_data(f"/{filename}") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
@ -92,6 +95,11 @@ def test_read_write_storage_with_globs(started_cluster):
print(ex)
assert "in readonly mode" in str(ex)
node1.query("drop table HDFSStorageWithRange")
node1.query("drop table HDFSStorageWithEnum")
node1.query("drop table HDFSStorageWithQuestionMark")
node1.query("drop table HDFSStorageWithAsterisk")
def test_storage_with_multidirectory_glob(started_cluster):
hdfs_api = started_cluster.hdfs_api
@ -137,7 +145,6 @@ def test_read_write_table(started_cluster):
def test_write_table(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')"
)
@ -148,6 +155,8 @@ def test_write_table(started_cluster):
result = "10\ttomas\t55.55\n11\tjack\t32.54\n"
assert hdfs_api.read_data("/other_storage") == result
assert node1.query("select * from OtherHDFSStorage order by id") == result
node1.query("truncate table OtherHDFSStorage")
node1.query("drop table OtherHDFSStorage")
def test_bad_hdfs_uri(started_cluster):
@ -166,6 +175,7 @@ def test_bad_hdfs_uri(started_cluster):
print(ex)
assert "Unable to connect to HDFS" in str(ex)
node1.query("drop table BadStorage2")
try:
node1.query(
"create table BadStorage3 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/<>', 'TSV')"
@ -173,6 +183,7 @@ def test_bad_hdfs_uri(started_cluster):
except Exception as ex:
print(ex)
assert "Unable to open HDFS file" in str(ex)
node1.query("drop table BadStorage3")
@pytest.mark.timeout(800)
@ -304,6 +315,8 @@ def test_write_gz_storage(started_cluster):
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
node1.query("truncate table GZHDFSStorage")
node1.query("drop table GZHDFSStorage")
def test_write_gzip_storage(started_cluster):
@ -315,6 +328,8 @@ def test_write_gzip_storage(started_cluster):
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
node1.query("truncate table GZIPHDFSStorage")
node1.query("drop table GZIPHDFSStorage")
def test_virtual_columns(started_cluster):
@ -333,6 +348,7 @@ def test_virtual_columns(started_cluster):
)
== expected
)
node1.query("drop table virtual_cols")
def test_read_files_with_spaces(started_cluster):
@ -354,6 +370,7 @@ def test_read_files_with_spaces(started_cluster):
)
assert node1.query("select * from test order by id") == "1\n2\n3\n"
fs.delete(dir, recursive=True)
node1.query(f"drop table test")
def test_truncate_table(started_cluster):
@ -375,47 +392,54 @@ def test_truncate_table(started_cluster):
def test_partition_by(started_cluster):
hdfs_api = started_cluster.hdfs_api
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
id = uuid.uuid4()
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
dir = f"partition_{id}"
fs.mkdirs(f"/{dir}/", permission=777)
file_name = "test_{_partition_id}"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (1, 3, 2)"
table_function = f"hdfs('hdfs://hdfs1:9000/{file_name}', 'TSV', '{table_format}')"
table_function = (
f"hdfs('hdfs://hdfs1:9000/{dir}/{file_name}', 'TSV', '{table_format}')"
)
node1.query(
f"insert into table function {table_function} PARTITION BY {partition_by} values {values}"
)
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_1', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_1', 'TSV', '{table_format}')"
)
assert result.strip() == "3\t2\t1"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_2', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_2', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t3\t2"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test_3', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_3', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t2\t3"
file_name = "test2_{_partition_id}"
node1.query(
f"create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS('hdfs://hdfs1:9000/{file_name}', 'TSV') partition by column3"
f"create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS('hdfs://hdfs1:9000/{dir}/{file_name}', 'TSV') partition by column3"
)
node1.query(f"insert into p values {values}")
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_1', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test2_1', 'TSV', '{table_format}')"
)
assert result.strip() == "3\t2\t1"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_2', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test2_2', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t3\t2"
result = node1.query(
f"select * from hdfs('hdfs://hdfs1:9000/test2_3', 'TSV', '{table_format}')"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test2_3', 'TSV', '{table_format}')"
)
assert result.strip() == "1\t2\t3"
node1.query(f"drop table p")
fs.delete("/{dir}", recursive=True)
def test_seekable_formats(started_cluster):
@ -425,7 +449,7 @@ def test_seekable_formats(started_cluster):
f"hdfs('hdfs://hdfs1:9000/parquet', 'Parquet', 'a Int32, b String')"
)
node1.query(
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)"
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1"
)
result = node1.query(f"SELECT count() FROM {table_function}")
@ -433,7 +457,7 @@ def test_seekable_formats(started_cluster):
table_function = f"hdfs('hdfs://hdfs1:9000/orc', 'ORC', 'a Int32, b String')"
node1.query(
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)"
f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1"
)
result = node1.query(f"SELECT count() FROM {table_function}")
assert int(result) == 5000000
@ -457,7 +481,7 @@ def test_read_table_with_default(started_cluster):
def test_schema_inference(started_cluster):
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/native', 'Native', 'a Int32, b String') SELECT number, randomString(100) FROM numbers(5000000)"
f"insert into table function hdfs('hdfs://hdfs1:9000/native', 'Native', 'a Int32, b String') SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1"
)
result = node1.query(f"desc hdfs('hdfs://hdfs1:9000/native', 'Native')")
@ -476,6 +500,7 @@ def test_schema_inference(started_cluster):
result = node1.query(f"select count(*) from schema_inference")
assert int(result) == 5000000
node1.query(f"drop table schema_inference")
def test_hdfsCluster(started_cluster):
@ -510,6 +535,7 @@ def test_hdfs_directory_not_exist(started_cluster):
assert "" == node1.query(
"select * from HDFSStorageWithNotExistDir settings hdfs_ignore_file_doesnt_exist=1"
)
node1.query("drop table HDFSStorageWithNotExistDir")
def test_overwrite(started_cluster):
@ -529,12 +555,16 @@ def test_overwrite(started_cluster):
result = node1.query(f"select count() from test_overwrite")
assert int(result) == 10
node1.query(f"truncate table test_overwrite")
node1.query(f"drop table test_overwrite")
def test_multiple_inserts(started_cluster):
hdfs_api = started_cluster.hdfs_api
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
id = uuid.uuid4()
fs.mkdirs(f"/{id}/", permission=777)
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
table_function = f"hdfs('hdfs://hdfs1:9000/{id}/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)"
@ -551,7 +581,7 @@ def test_multiple_inserts(started_cluster):
result = node1.query(f"drop table test_multiple_inserts")
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
table_function = f"hdfs('hdfs://hdfs1:9000/{id}/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(
f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(10)"
@ -565,6 +595,7 @@ def test_multiple_inserts(started_cluster):
result = node1.query(f"select count() from test_multiple_inserts")
assert int(result) == 60
node1.query(f"drop table test_multiple_inserts")
def test_format_detection(started_cluster):
@ -574,6 +605,8 @@ def test_format_detection(started_cluster):
node1.query(f"insert into arrow_table select 1")
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/data.arrow')")
assert int(result) == 1
node1.query(f"truncate table arrow_table")
node1.query(f"drop table arrow_table")
def test_schema_inference_with_globs(started_cluster):
@ -618,6 +651,8 @@ def test_schema_inference_with_globs(started_cluster):
def test_insert_select_schema_inference(started_cluster):
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/test.native.zst') select toUInt64(1) as x"
)
@ -627,6 +662,7 @@ def test_insert_select_schema_inference(started_cluster):
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/test.native.zst')")
assert int(result) == 1
fs.delete("/test.native.zst")
def test_cluster_join(started_cluster):
@ -967,11 +1003,11 @@ def test_read_subcolumns(started_cluster):
node = started_cluster.instances["node1"]
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)"
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3) settings hdfs_truncate_on_insert=1"
)
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)"
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3) settings hdfs_truncate_on_insert=1"
)
res = node.query(
@ -1003,7 +1039,7 @@ def test_read_subcolumn_time(started_cluster):
node = started_cluster.instances["node1"]
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumn_time.tsv', auto, 'a UInt32') select (42)"
f"insert into function hdfs('hdfs://hdfs1:9000/test_subcolumn_time.tsv', auto, 'a UInt32') select (42) settings hdfs_truncate_on_insert=1"
)
res = node.query(
@ -1014,91 +1050,103 @@ def test_read_subcolumn_time(started_cluster):
def test_union_schema_inference_mode(started_cluster):
id = uuid.uuid4()
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
dir = f"union_{id}"
fs.mkdirs(f"/{dir}/", permission=777)
node = started_cluster.instances["node1"]
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference1.jsonl') select 1 as a"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference1.jsonl') select 1 as a"
)
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference2.jsonl') select 2 as b"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference2.jsonl') select 2 as b"
)
node.query("system drop schema cache for hdfs")
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "a\tNullable(Int64)\nb\tNullable(Int64)\n"
result = node.query(
"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%test_union_schema_inference%' order by file format TSV"
f"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%test_union_schema_inference%' order by file format TSV"
)
assert (
result == "UNION\ttest_union_schema_inference1.jsonl\ta Nullable(Int64)\n"
"UNION\ttest_union_schema_inference2.jsonl\tb Nullable(Int64)\n"
)
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference*.jsonl') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "1\t\\N\n" "\\N\t2\n"
node.query(f"system drop schema cache for hdfs")
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference2.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference2.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "b\tNullable(Int64)\n"
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "a\tNullable(Int64)\n" "b\tNullable(Int64)\n"
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference3.jsonl', TSV) select 'Error'"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference3.jsonl', TSV) select 'Error'"
)
error = node.query_and_get_error(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in error
def test_format_detection(started_cluster):
node = started_cluster.instances["node1"]
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
id = uuid.uuid4()
dir = f"{id}"
fs.mkdirs(f"/{dir}/", permission=777)
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_format_detection0', JSONEachRow) select number as x, 'str_' || toString(number) as y from numbers(0)"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection0', JSONEachRow) select number as x, 'str_' || toString(number) as y from numbers(0)"
)
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_format_detection1', JSONEachRow) select number as x, 'str_' || toString(number) as y from numbers(10)"
f"insert into function hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1', JSONEachRow) select number as x, 'str_' || toString(number) as y from numbers(10)"
)
expected_desc_result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_format_detection1', JSONEachRow)"
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1', JSONEachRow)"
)
desc_result = node.query("desc hdfs('hdfs://hdfs1:9000/test_format_detection1')")
desc_result = node.query(
f"desc hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1')"
)
assert expected_desc_result == desc_result
expected_result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection1', JSONEachRow, 'x UInt64, y String') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1', JSONEachRow, 'x UInt64, y String') order by x, y"
)
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection1') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1') order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection1', auto, 'x UInt64, y String') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection1', auto, 'x UInt64, y String') order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection{0,1}') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}') order by x, y"
)
assert expected_result == result
@ -1106,25 +1154,25 @@ def test_format_detection(started_cluster):
node.query("system drop schema cache for hdfs")
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_format_detection{0,1}') order by x, y"
f"select * from hdfs('hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}') order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/test_format_detection{0,1}') order by x, y"
f"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}') order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/test_format_detection{0,1}', auto, auto) order by x, y"
f"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}', auto, auto) order by x, y"
)
assert expected_result == result
result = node.query(
"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/test_format_detection{0,1}', auto, 'x UInt64, y String') order by x, y"
f"select * from hdfsCluster(test_cluster_two_shards, 'hdfs://hdfs1:9000/{dir}/test_format_detection{{0,1}}', auto, 'x UInt64, y String') order by x, y"
)
assert expected_result == result