2018-12-05 13:24:45 +00:00
import os
2020-09-16 04:26:10 +00:00
import pytest
2022-06-21 13:02:48 +00:00
import time
2018-12-05 13:24:45 +00:00
from helpers . cluster import ClickHouseCluster
2022-04-27 23:32:49 +00:00
from helpers . test_tools import TSV
2021-11-10 14:21:25 +00:00
from pyhdfs import HdfsClient
2018-12-05 13:24:45 +00:00
cluster = ClickHouseCluster ( __file__ )
2022-04-27 23:32:49 +00:00
node1 = cluster . add_instance (
2022-08-11 10:55:18 +00:00
" node1 " ,
main_configs = [ " configs/macro.xml " , " configs/schema_cache.xml " ] ,
with_hdfs = True ,
2022-04-27 23:32:49 +00:00
)
2018-12-05 13:24:45 +00:00
2020-09-16 04:26:10 +00:00
2018-12-05 13:24:45 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-07-12 08:32:20 +00:00
node1 . query ( " drop table if exists SimpleHDFSStorage SYNC " )
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/simple_storage ' , ' TSV ' ) "
)
2019-09-05 14:42:17 +00:00
node1 . query ( " insert into SimpleHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_storage " ) == " 1 \t Mark \t 72.53 \n "
2018-12-05 13:24:45 +00:00
assert node1 . query ( " select * from SimpleHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_storage_with_globs ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage { 1..5} ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithEnum (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage { 1,2,3,4,5} ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithQuestionMark (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage? ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage* ' , ' TSV ' ) "
)
2019-09-05 14:42:17 +00:00
2019-09-20 11:26:00 +00:00
for i in [ " 1 " , " 2 " , " 3 " ] :
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /storage " + i , i + " \t Mark \t 72.53 \n " )
assert hdfs_api . read_data ( " /storage " + i ) == i + " \t Mark \t 72.53 \n "
2019-09-20 11:26:00 +00:00
assert node1 . query ( " select count(*) from HDFSStorageWithRange " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithEnum " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithQuestionMark " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithAsterisk " ) == " 3 \n "
try :
node1 . query ( " insert into HDFSStorageWithEnum values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
try :
node1 . query ( " insert into HDFSStorageWithQuestionMark values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
try :
node1 . query ( " insert into HDFSStorageWithAsterisk values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
2019-09-05 14:42:17 +00:00
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2018-12-05 13:24:45 +00:00
data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /simple_table_function " , data )
2018-12-05 13:24:45 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_table_function " ) == data
2018-12-05 13:24:45 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) "
)
== data
)
2019-01-17 14:10:30 +00:00
2022-08-05 16:20:15 +00:00
def test_write_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/other_storage ' , ' TSV ' ) "
)
node1 . query (
" insert into OtherHDFSStorage values (10, ' tomas ' , 55.55), (11, ' jack ' , 32.54) "
)
2019-01-17 14:10:30 +00:00
result = " 10 \t tomas \t 55.55 \n 11 \t jack \t 32.54 \n "
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /other_storage " ) == result
2019-01-17 14:10:30 +00:00
assert node1 . query ( " select * from OtherHDFSStorage order by id " ) == result
2019-01-19 20:17:19 +00:00
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_bad_hdfs_uri ( started_cluster ) :
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage1 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hads:hgsdfs100500:9000/other_storage ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2021-11-08 11:12:06 +00:00
assert " Bad hdfs url " in str ( ex )
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs100500:9000/other_storage ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " Unable to create builder to connect to HDFS " in str ( ex )
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage3 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/<> ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " Unable to open HDFS file " in str ( ex )
2019-08-01 15:46:54 +00:00
2022-03-22 16:39:58 +00:00
2020-09-28 17:20:04 +00:00
@pytest.mark.timeout ( 800 )
2022-08-05 16:20:15 +00:00
def test_globs_in_read_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-08-01 15:46:54 +00:00
some_data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2019-08-09 17:25:29 +00:00
globs_dir = " /dir_for_test_with_globs/ "
2022-03-22 16:39:58 +00:00
files = [
" dir1/dir_dir/file1 " ,
" dir2/file2 " ,
" simple_table_function " ,
" dir/file " ,
" some_dir/dir1/file " ,
" some_dir/dir2/file " ,
" some_dir/file " ,
" table1_function " ,
" table2_function " ,
" table3_function " ,
]
2019-08-09 17:25:29 +00:00
for filename in files :
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( globs_dir + filename , some_data )
2019-08-09 17:25:29 +00:00
2022-03-22 16:39:58 +00:00
test_requests = [
( " dir { 1..5}/dir_dir/file1 " , 1 , 1 ) ,
( " *_table_functio? " , 1 , 1 ) ,
( " dir/fil? " , 1 , 1 ) ,
( " table { 3..8}_function " , 1 , 1 ) ,
( " table { 2..8}_function " , 2 , 2 ) ,
( " dir/* " , 1 , 1 ) ,
( " dir/*?*?*?*?* " , 1 , 1 ) ,
( " dir/*?*?*?*?*?* " , 0 , 0 ) ,
( " some_dir/*/file " , 2 , 1 ) ,
( " some_dir/dir?/* " , 2 , 1 ) ,
( " */*/* " , 3 , 2 ) ,
( " ? " , 0 , 0 ) ,
]
2020-01-15 07:52:45 +00:00
for pattern , paths_amount , files_amount in test_requests :
2022-03-22 16:39:58 +00:00
inside_table_func = (
" ' hdfs://hdfs1:9000 "
+ globs_dir
+ pattern
+ " ' , ' TSV ' , ' id UInt64, text String, number Float64 ' "
)
2020-09-28 17:20:04 +00:00
print ( " inside_table_func " , inside_table_func )
2022-03-22 16:39:58 +00:00
assert (
node1 . query ( " select * from hdfs( " + inside_table_func + " ) " )
== paths_amount * some_data
)
assert node1 . query (
" select count(distinct _path) from hdfs( " + inside_table_func + " ) "
) . rstrip ( ) == str ( paths_amount )
assert node1 . query (
" select count(distinct _file) from hdfs( " + inside_table_func + " ) "
) . rstrip ( ) == str ( files_amount )
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table_with_parameter_gzip ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' gzip ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_table_with_parameter_none ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' none ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table_with_parameter_auto_gz ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' auto ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_write_gz_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage.gz ' , ' TSV ' ) "
)
2019-11-19 12:46:07 +00:00
node1 . query ( " insert into GZHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /storage.gz " ) == " 1 \t Mark \t 72.53 \n "
2019-11-19 12:46:07 +00:00
assert node1 . query ( " select * from GZHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_write_gzip_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/gzip_storage ' , ' TSV ' , ' gzip ' ) "
)
2019-11-19 12:46:07 +00:00
node1 . query ( " insert into GZIPHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /gzip_storage " ) == " 1 \t Mark \t 72.53 \n "
2019-11-19 12:46:07 +00:00
assert node1 . query ( " select * from GZIPHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-09 12:13:20 +00:00
2021-04-20 08:38:14 +00:00
2022-08-05 16:20:15 +00:00
def test_virtual_columns ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2022-03-22 16:39:58 +00:00
node1 . query (
" create table virtual_cols (id UInt32) ENGINE = HDFS( ' hdfs://hdfs1:9000/file* ' , ' TSV ' ) "
)
2021-04-27 17:20:13 +00:00
hdfs_api . write_data ( " /file1 " , " 1 \n " )
hdfs_api . write_data ( " /file2 " , " 2 \n " )
hdfs_api . write_data ( " /file3 " , " 3 \n " )
2021-04-20 08:38:14 +00:00
expected = " 1 \t file1 \t hdfs://hdfs1:9000//file1 \n 2 \t file2 \t hdfs://hdfs1:9000//file2 \n 3 \t file3 \t hdfs://hdfs1:9000//file3 \n "
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select id, _file as file_name, _path as file_path from virtual_cols order by id "
)
== expected
)
2021-04-20 08:38:14 +00:00
2021-06-21 13:50:09 +00:00
2022-08-05 16:20:15 +00:00
def test_read_files_with_spaces ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-11-10 14:20:25 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
2022-03-22 16:39:58 +00:00
dir = " /test_spaces "
2021-11-10 14:20:25 +00:00
exists = fs . exists ( dir )
if exists :
fs . delete ( dir , recursive = True )
fs . mkdirs ( dir )
hdfs_api . write_data ( f " { dir } /test test test 1.txt " , " 1 \n " )
hdfs_api . write_data ( f " { dir } /test test test 2.txt " , " 2 \n " )
hdfs_api . write_data ( f " { dir } /test test test 3.txt " , " 3 \n " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table test (id UInt32) ENGINE = HDFS( ' hdfs://hdfs1:9000/ { dir } /test* ' , ' TSV ' ) "
)
2021-04-19 20:39:22 +00:00
assert node1 . query ( " select * from test order by id " ) == " 1 \n 2 \n 3 \n "
2021-11-10 14:20:25 +00:00
fs . delete ( dir , recursive = True )
2021-04-19 20:39:22 +00:00
2022-08-05 16:20:15 +00:00
def test_truncate_table ( started_cluster ) :
2021-06-21 13:50:09 +00:00
hdfs_api = started_cluster . hdfs_api
node1 . query (
2022-03-22 16:39:58 +00:00
" create table test_truncate (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/tr ' , ' TSV ' ) "
)
2021-06-21 13:50:09 +00:00
node1 . query ( " insert into test_truncate values (1, ' Mark ' , 72.53) " )
assert hdfs_api . read_data ( " /tr " ) == " 1 \t Mark \t 72.53 \n "
assert node1 . query ( " select * from test_truncate " ) == " 1 \t Mark \t 72.53 \n "
node1 . query ( " truncate table test_truncate " )
assert node1 . query ( " select * from test_truncate " ) == " "
node1 . query ( " drop table test_truncate " )
2022-08-05 16:20:15 +00:00
def test_partition_by ( started_cluster ) :
2021-10-25 16:23:44 +00:00
hdfs_api = started_cluster . hdfs_api
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
file_name = " test_ {_partition_id} "
partition_by = " column3 "
values = " (1, 2, 3), (3, 2, 1), (1, 3, 2) "
table_function = f " hdfs( ' hdfs://hdfs1:9000/ { file_name } ' , ' TSV ' , ' { table_format } ' ) "
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function { table_function } PARTITION BY { partition_by } values { values } "
)
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_1 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 3 \t 2 \t 1 "
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_2 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 1 \t 3 \t 2 "
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_3 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 1 \t 2 \t 3 "
2021-10-25 16:23:44 +00:00
2021-10-26 12:22:13 +00:00
file_name = " test2_ {_partition_id} "
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS( ' hdfs://hdfs1:9000/ { file_name } ' , ' TSV ' ) partition by column3 "
)
2021-10-26 12:22:13 +00:00
node1 . query ( f " insert into p values { values } " )
2022-03-22 16:39:58 +00:00
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test2_1 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 3 \t 2 \t 1 "
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test2_2 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 1 \t 3 \t 2 "
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test2_3 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 1 \t 2 \t 3 "
2021-10-26 12:22:13 +00:00
2021-10-25 16:23:44 +00:00
2022-08-05 16:20:15 +00:00
def test_seekable_formats ( started_cluster ) :
2021-10-31 19:53:24 +00:00
hdfs_api = started_cluster . hdfs_api
2022-03-22 16:39:58 +00:00
table_function = (
f " hdfs( ' hdfs://hdfs1:9000/parquet ' , ' Parquet ' , ' a Int32, b String ' ) "
)
node1 . query (
f " insert into table function { table_function } SELECT number, randomString(100) FROM numbers(5000000) "
)
2021-10-31 19:53:24 +00:00
result = node1 . query ( f " SELECT count() FROM { table_function } " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2021-10-31 19:53:24 +00:00
table_function = f " hdfs( ' hdfs://hdfs1:9000/orc ' , ' ORC ' , ' a Int32, b String ' ) "
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function { table_function } SELECT number, randomString(100) FROM numbers(5000000) "
)
2021-10-31 19:53:24 +00:00
result = node1 . query ( f " SELECT count() FROM { table_function } " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2021-10-31 19:53:24 +00:00
2021-12-17 15:34:13 +00:00
2022-08-05 16:20:15 +00:00
def test_read_table_with_default ( started_cluster ) :
2021-12-06 03:54:45 +00:00
hdfs_api = started_cluster . hdfs_api
data = " n \n 100 \n "
hdfs_api . write_data ( " /simple_table_function " , data )
assert hdfs_api . read_data ( " /simple_table_function " ) == data
output = " n \t m \n 100 \t 200 \n "
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSVWithNames ' , ' n UInt32, m UInt32 DEFAULT n * 2 ' ) FORMAT TSVWithNames "
)
== output
)
2021-12-06 03:54:45 +00:00
2022-08-05 16:20:15 +00:00
def test_schema_inference ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' , ' a Int32, b String ' ) SELECT number, randomString(100) FROM numbers(5000000) "
)
2021-12-17 15:34:13 +00:00
2021-12-20 21:00:40 +00:00
result = node1 . query ( f " desc hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' ) " )
2021-12-17 15:34:13 +00:00
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
2022-03-22 16:39:58 +00:00
result = node1 . query (
f " select count(*) from hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' ) "
)
assert int ( result ) == 5000000
2021-12-17 15:34:13 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table schema_inference engine=HDFS( ' hdfs://hdfs1:9000/native ' , ' Native ' ) "
)
2021-12-17 15:34:13 +00:00
result = node1 . query ( f " desc schema_inference " )
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
result = node1 . query ( f " select count(*) from schema_inference " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2021-12-17 15:34:13 +00:00
2021-10-31 19:53:24 +00:00
2022-08-05 16:20:15 +00:00
def test_hdfsCluster ( started_cluster ) :
2021-12-03 05:25:14 +00:00
hdfs_api = started_cluster . hdfs_api
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
2022-03-22 16:39:58 +00:00
dir = " /test_hdfsCluster "
2021-12-03 05:25:14 +00:00
exists = fs . exists ( dir )
if exists :
fs . delete ( dir , recursive = True )
fs . mkdirs ( dir )
hdfs_api . write_data ( " /test_hdfsCluster/file1 " , " 1 \n " )
hdfs_api . write_data ( " /test_hdfsCluster/file2 " , " 2 \n " )
hdfs_api . write_data ( " /test_hdfsCluster/file3 " , " 3 \n " )
2022-03-22 16:39:58 +00:00
actual = node1 . query (
" select id, _file as file_name, _path as file_path from hdfs( ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) order by id "
)
2021-12-03 05:25:14 +00:00
expected = " 1 \t file1 \t hdfs://hdfs1:9000/test_hdfsCluster/file1 \n 2 \t file2 \t hdfs://hdfs1:9000/test_hdfsCluster/file2 \n 3 \t file3 \t hdfs://hdfs1:9000/test_hdfsCluster/file3 \n "
assert actual == expected
2022-03-22 16:39:58 +00:00
actual = node1 . query (
" select id, _file as file_name, _path as file_path from hdfsCluster( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) order by id "
)
2021-12-03 05:25:14 +00:00
expected = " 1 \t file1 \t hdfs://hdfs1:9000/test_hdfsCluster/file1 \n 2 \t file2 \t hdfs://hdfs1:9000/test_hdfsCluster/file2 \n 3 \t file3 \t hdfs://hdfs1:9000/test_hdfsCluster/file3 \n "
assert actual == expected
fs . delete ( dir , recursive = True )
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_hdfs_directory_not_exist ( started_cluster ) :
2022-03-22 16:39:58 +00:00
ddl = " create table HDFSStorageWithNotExistDir (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/data/not_eixst ' , ' TSV ' ) "
2022-01-18 05:55:41 +00:00
node1 . query ( ddl )
2022-01-18 12:47:04 +00:00
assert " " == node1 . query ( " select * from HDFSStorageWithNotExistDir " )
2021-12-03 05:25:14 +00:00
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_overwrite ( started_cluster ) :
2021-12-29 18:03:15 +00:00
hdfs_api = started_cluster . hdfs_api
table_function = f " hdfs( ' hdfs://hdfs1:9000/data ' , ' Parquet ' , ' a Int32, b String ' ) "
2022-01-18 19:26:13 +00:00
node1 . query ( f " create table test_overwrite as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_overwrite select number, randomString(100) from numbers(5) "
)
node1 . query_and_get_error (
f " insert into test_overwrite select number, randomString(100) FROM numbers(10) "
)
node1 . query (
f " insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
2022-01-18 19:26:13 +00:00
result = node1 . query ( f " select count() from test_overwrite " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 10
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_multiple_inserts ( started_cluster ) :
2021-12-29 18:03:15 +00:00
hdfs_api = started_cluster . hdfs_api
table_function = f " hdfs( ' hdfs://hdfs1:9000/data_multiple_inserts ' , ' Parquet ' , ' a Int32, b String ' ) "
node1 . query ( f " create table test_multiple_inserts as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(10) "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1 "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " select count() from test_multiple_inserts " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 60
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " drop table test_multiple_inserts " )
table_function = f " hdfs( ' hdfs://hdfs1:9000/data_multiple_inserts.gz ' , ' Parquet ' , ' a Int32, b String ' ) "
node1 . query ( f " create table test_multiple_inserts as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(10) "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1 "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " select count() from test_multiple_inserts " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 60
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_format_detection ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table arrow_table (x UInt64) engine=HDFS( ' hdfs://hdfs1:9000/data.arrow ' ) "
)
2022-01-14 11:00:50 +00:00
node1 . query ( f " insert into arrow_table select 1 " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/data.arrow ' ) " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 1
2022-01-14 11:00:50 +00:00
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_schema_inference_with_globs ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/data1.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select NULL "
)
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/data2.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select 0 "
)
2022-02-09 16:14:14 +00:00
result = node1 . query ( f " desc hdfs( ' hdfs://hdfs1:9000/data*.jsoncompacteachrow ' ) " )
2022-07-21 16:54:42 +00:00
assert result . strip ( ) == " c1 \t Nullable(Int64) "
2022-02-09 16:14:14 +00:00
2022-03-22 16:39:58 +00:00
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/data*.jsoncompacteachrow ' ) "
)
assert sorted ( result . split ( ) ) == [ " 0 " , " \\ N " ]
2022-02-09 16:14:14 +00:00
2022-04-13 16:59:04 +00:00
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/data3.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select NULL "
)
filename = " data { 1,3}.jsoncompacteachrow "
2022-06-29 11:10:39 +00:00
result = node1 . query_and_get_error (
2022-08-15 12:33:08 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/ { filename } ' ) settings schema_inference_use_cache_for_hdfs=0 "
2022-06-29 11:10:39 +00:00
)
2022-04-13 16:59:04 +00:00
assert " All attempts to extract table structure from files failed " in result
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/data0.jsoncompacteachrow ' , ' TSV ' , ' x String ' ) select ' [123;] ' "
)
2022-04-20 14:34:05 +00:00
2022-04-13 16:59:04 +00:00
result = node1 . query_and_get_error (
2022-08-15 12:33:08 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/data*.jsoncompacteachrow ' ) settings schema_inference_use_cache_for_hdfs=0 "
2022-04-13 16:59:04 +00:00
)
2022-04-20 14:34:05 +00:00
assert (
" Cannot extract table structure from JSONCompactEachRow format file " in result
)
2022-04-13 16:59:04 +00:00
2022-02-09 16:14:14 +00:00
2022-08-05 16:20:15 +00:00
def test_insert_select_schema_inference ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) select toUInt64(1) as x "
)
2022-02-18 16:19:42 +00:00
result = node1 . query ( f " desc hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) " )
2022-03-22 16:39:58 +00:00
assert result . strip ( ) == " x \t UInt64 "
2022-02-18 16:19:42 +00:00
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 1
2022-02-18 16:19:42 +00:00
2022-04-07 10:11:49 +00:00
2022-08-05 16:20:15 +00:00
def test_cluster_join ( started_cluster ) :
2022-03-30 08:19:16 +00:00
result = node1 . query (
2022-03-31 01:28:07 +00:00
"""
2022-03-30 08:19:16 +00:00
SELECT l . id , r . id FROM hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) as l
JOIN hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) as r
ON l . id = r . id
2022-03-31 01:28:07 +00:00
"""
2022-03-30 08:19:16 +00:00
)
assert " AMBIGUOUS_COLUMN_NAME " not in result
2022-02-18 16:19:42 +00:00
2022-08-05 16:20:15 +00:00
def test_cluster_macro ( started_cluster ) :
2022-04-27 23:32:49 +00:00
with_macro = node1 . query (
"""
SELECT id FROM hdfsCluster ( ' {default_cluster_macro} ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' )
"""
)
no_macro = node1 . query (
"""
SELECT id FROM hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' )
"""
)
assert TSV ( with_macro ) == TSV ( no_macro )
2022-08-05 16:20:15 +00:00
def test_virtual_columns_2 ( started_cluster ) :
2022-03-24 16:10:04 +00:00
hdfs_api = started_cluster . hdfs_api
2022-03-25 09:14:14 +00:00
table_function = (
2022-03-30 15:34:07 +00:00
f " hdfs( ' hdfs://hdfs1:9000/parquet_2 ' , ' Parquet ' , ' a Int32, b String ' ) "
2022-03-25 09:14:14 +00:00
)
2022-03-24 16:10:04 +00:00
node1 . query ( f " insert into table function { table_function } SELECT 1, ' kek ' " )
result = node1 . query ( f " SELECT _path FROM { table_function } " )
2022-03-31 09:13:38 +00:00
assert result . strip ( ) == " hdfs://hdfs1:9000/parquet_2 "
table_function = (
f " hdfs( ' hdfs://hdfs1:9000/parquet_3 ' , ' Parquet ' , ' a Int32, _path String ' ) "
)
node1 . query ( f " insert into table function { table_function } SELECT 1, ' kek ' " )
result = node1 . query ( f " SELECT _path FROM { table_function } " )
assert result . strip ( ) == " kek "
2022-03-24 16:10:04 +00:00
2022-06-21 13:02:48 +00:00
def get_profile_event_for_query ( node , query , profile_event ) :
2022-06-27 12:43:24 +00:00
node . query ( " system flush logs " )
2022-06-21 13:02:48 +00:00
query = query . replace ( " ' " , " \\ ' " )
2022-06-27 12:43:24 +00:00
return int (
node . query (
2022-08-24 11:28:56 +00:00
f " select ProfileEvents[ ' { profile_event } ' ] from system.query_log where query= ' { query } ' and type = ' QueryFinish ' order by query_start_time_microseconds desc limit 1 "
2022-06-27 12:43:24 +00:00
)
)
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
def check_cache_misses ( node1 , file , amount = 1 ) :
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query (
node1 ,
f " desc hdfs( ' hdfs://hdfs1:9000/ { file } ' ) " ,
" SchemaInferenceCacheMisses " ,
)
== amount
)
2022-08-05 16:20:15 +00:00
def check_cache_hits ( node1 , file , amount = 1 ) :
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query (
node1 , f " desc hdfs( ' hdfs://hdfs1:9000/ { file } ' ) " , " SchemaInferenceCacheHits "
)
== amount
)
2022-08-05 16:20:15 +00:00
def check_cache_invalidations ( node1 , file , amount = 1 ) :
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query (
node1 ,
f " desc hdfs( ' hdfs://hdfs1:9000/ { file } ' ) " ,
" SchemaInferenceCacheInvalidations " ,
)
== amount
)
2022-08-05 16:20:15 +00:00
def check_cache_evictions ( node1 , file , amount = 1 ) :
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query (
node1 ,
f " desc hdfs( ' hdfs://hdfs1:9000/ { file } ' ) " ,
" SchemaInferenceCacheEvictions " ,
)
== amount
)
2022-08-05 16:20:15 +00:00
def check_cache ( node1 , expected_files ) :
sources = node1 . query ( " select source from system.schema_inference_cache " )
2022-08-11 10:55:18 +00:00
assert sorted ( map ( lambda x : x . strip ( ) . split ( " / " ) [ - 1 ] , sources . split ( ) ) ) == sorted (
expected_files
)
2022-08-05 16:20:15 +00:00
def run_describe_query ( node , file ) :
query = f " desc hdfs( ' hdfs://hdfs1:9000/ { file } ' ) "
node . query ( query )
2022-06-21 13:02:48 +00:00
def test_schema_inference_cache ( started_cluster ) :
2022-08-16 10:39:35 +00:00
node1 . query ( " system drop schema cache " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-21 13:02:48 +00:00
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " ] )
check_cache_misses ( node1 , " test_cache0.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_hits ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-21 13:02:48 +00:00
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_invalidations ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache1.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
2022-08-05 16:20:15 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache1.jsonl " ] )
check_cache_misses ( node1 , " test_cache1.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache_hits ( node1 , " test_cache1.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache2.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-27 12:43:24 +00:00
)
2022-08-05 16:20:15 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache ( node1 , [ " test_cache1.jsonl " , " test_cache2.jsonl " ] )
check_cache_misses ( node1 , " test_cache2.jsonl " )
check_cache_evictions ( node1 , " test_cache2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache_hits ( node1 , " test_cache2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache_hits ( node1 , " test_cache1.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache1.jsonl " ] )
check_cache_misses ( node1 , " test_cache0.jsonl " )
check_cache_evictions ( node1 , " test_cache0.jsonl " )
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache2.jsonl " ] )
check_cache_misses ( node1 , " test_cache2.jsonl " )
check_cache_evictions ( node1 , " test_cache2.jsonl " )
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache_hits ( node1 , " test_cache2.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_hits ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
2022-06-27 12:43:24 +00:00
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache3.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-08-05 16:20:15 +00:00
files = " test_cache { 0,1,2,3}.jsonl "
run_describe_query ( node1 , files )
check_cache_hits ( node1 , files )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node1 . query ( f " system drop schema cache for hdfs " )
check_cache ( node1 , [ ] )
run_describe_query ( node1 , files )
check_cache_misses ( node1 , files , 4 )
node1 . query ( " system drop schema cache " )
check_cache ( node1 , [ ] )
run_describe_query ( node1 , files )
check_cache_misses ( node1 , files , 4 )
2022-06-21 13:02:48 +00:00
2022-03-22 16:39:58 +00:00
if __name__ == " __main__ " :
2020-09-09 12:13:20 +00:00
cluster . start ( )
2020-10-30 19:40:16 +00:00
input ( " Cluster created, press any key to destroy... " )
2020-09-09 12:13:20 +00:00
cluster . shutdown ( )