2018-12-05 13:24:45 +00:00
import os
2020-09-16 04:26:10 +00:00
import pytest
2022-06-21 13:02:48 +00:00
import time
2024-03-20 14:05:06 +00:00
from helpers . cluster import ClickHouseCluster , is_arm
2022-04-27 23:32:49 +00:00
from helpers . test_tools import TSV
2021-11-10 14:21:25 +00:00
from pyhdfs import HdfsClient
2018-12-05 13:24:45 +00:00
2024-03-20 14:05:06 +00:00
if is_arm ( ) :
pytestmark = pytest . mark . skip
2018-12-05 13:24:45 +00:00
cluster = ClickHouseCluster ( __file__ )
2022-04-27 23:32:49 +00:00
node1 = cluster . add_instance (
2022-08-11 10:55:18 +00:00
" node1 " ,
2022-11-16 01:18:45 +00:00
main_configs = [
2022-11-16 01:59:44 +00:00
" configs/macro.xml " ,
" configs/schema_cache.xml " ,
" configs/cluster.xml " ,
2022-11-16 01:33:46 +00:00
] ,
2022-08-11 10:55:18 +00:00
with_hdfs = True ,
2022-04-27 23:32:49 +00:00
)
2018-12-05 13:24:45 +00:00
2020-09-16 04:26:10 +00:00
2018-12-05 13:24:45 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-07-12 08:32:20 +00:00
node1 . query ( " drop table if exists SimpleHDFSStorage SYNC " )
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/simple_storage ' , ' TSV ' ) "
)
2019-09-05 14:42:17 +00:00
node1 . query ( " insert into SimpleHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_storage " ) == " 1 \t Mark \t 72.53 \n "
2018-12-05 13:24:45 +00:00
assert node1 . query ( " select * from SimpleHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_storage_with_globs ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage { 1..5} ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithEnum (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage { 1,2,3,4,5} ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithQuestionMark (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage? ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage* ' , ' TSV ' ) "
)
2019-09-05 14:42:17 +00:00
2019-09-20 11:26:00 +00:00
for i in [ " 1 " , " 2 " , " 3 " ] :
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /storage " + i , i + " \t Mark \t 72.53 \n " )
assert hdfs_api . read_data ( " /storage " + i ) == i + " \t Mark \t 72.53 \n "
2019-09-20 11:26:00 +00:00
assert node1 . query ( " select count(*) from HDFSStorageWithRange " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithEnum " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithQuestionMark " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithAsterisk " ) == " 3 \n "
try :
node1 . query ( " insert into HDFSStorageWithEnum values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
try :
node1 . query ( " insert into HDFSStorageWithQuestionMark values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
try :
node1 . query ( " insert into HDFSStorageWithAsterisk values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
2019-09-05 14:42:17 +00:00
2020-09-16 04:26:10 +00:00
2023-06-11 00:09:05 +00:00
def test_storage_with_multidirectory_glob ( started_cluster ) :
hdfs_api = started_cluster . hdfs_api
for i in [ " 1 " , " 2 " ] :
2023-06-13 16:47:02 +00:00
hdfs_api . write_data (
f " /multiglob/p { i } /path { i } /postfix/data { i } " , f " File { i } \t { i } { i } \n "
)
assert (
hdfs_api . read_data ( f " /multiglob/p { i } /path { i } /postfix/data { i } " )
== f " File { i } \t { i } { i } \n "
)
2023-06-11 00:09:05 +00:00
2023-06-13 16:47:02 +00:00
r = node1 . query (
" SELECT * FROM hdfs( ' hdfs://hdfs1:9000/multiglob/ { p1/path1,p2/path2}/postfix/data { 1,2} ' , TSV) "
)
2023-06-11 16:42:10 +00:00
assert ( r == f " File1 \t 11 \n File2 \t 22 \n " ) or ( r == f " File2 \t 22 \n File1 \t 11 \n " )
2023-06-11 00:09:05 +00:00
2023-06-22 15:19:47 +00:00
try :
2023-06-22 17:48:28 +00:00
node1 . query (
" SELECT * FROM hdfs( ' hdfs://hdfs1:9000/multiglob/ { p4/path1,p2/path3}/postfix/data { 1,2}.nonexist ' , TSV) "
)
2023-06-22 15:19:47 +00:00
assert False , " Exception have to be thrown "
except Exception as ex :
print ( ex )
assert " no files " in str ( ex )
2023-06-11 00:09:05 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2018-12-05 13:24:45 +00:00
data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /simple_table_function " , data )
2018-12-05 13:24:45 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_table_function " ) == data
2018-12-05 13:24:45 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) "
)
== data
)
2019-01-17 14:10:30 +00:00
2022-08-05 16:20:15 +00:00
def test_write_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/other_storage ' , ' TSV ' ) "
)
node1 . query (
" insert into OtherHDFSStorage values (10, ' tomas ' , 55.55), (11, ' jack ' , 32.54) "
)
2019-01-17 14:10:30 +00:00
result = " 10 \t tomas \t 55.55 \n 11 \t jack \t 32.54 \n "
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /other_storage " ) == result
2019-01-17 14:10:30 +00:00
assert node1 . query ( " select * from OtherHDFSStorage order by id " ) == result
2019-01-19 20:17:19 +00:00
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_bad_hdfs_uri ( started_cluster ) :
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage1 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hads:hgsdfs100500:9000/other_storage ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2021-11-08 11:12:06 +00:00
assert " Bad hdfs url " in str ( ex )
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs100500:9000/other_storage ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " Unable to create builder to connect to HDFS " in str ( ex )
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage3 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/<> ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " Unable to open HDFS file " in str ( ex )
2019-08-01 15:46:54 +00:00
2022-03-22 16:39:58 +00:00
2020-09-28 17:20:04 +00:00
@pytest.mark.timeout ( 800 )
2022-08-05 16:20:15 +00:00
def test_globs_in_read_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-08-01 15:46:54 +00:00
some_data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2019-08-09 17:25:29 +00:00
globs_dir = " /dir_for_test_with_globs/ "
2022-03-22 16:39:58 +00:00
files = [
" dir1/dir_dir/file1 " ,
" dir2/file2 " ,
" simple_table_function " ,
" dir/file " ,
" some_dir/dir1/file " ,
" some_dir/dir2/file " ,
" some_dir/file " ,
" table1_function " ,
" table2_function " ,
" table3_function " ,
]
2019-08-09 17:25:29 +00:00
for filename in files :
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( globs_dir + filename , some_data )
2019-08-09 17:25:29 +00:00
2022-03-22 16:39:58 +00:00
test_requests = [
( " dir { 1..5}/dir_dir/file1 " , 1 , 1 ) ,
( " *_table_functio? " , 1 , 1 ) ,
( " dir/fil? " , 1 , 1 ) ,
( " table { 3..8}_function " , 1 , 1 ) ,
( " table { 2..8}_function " , 2 , 2 ) ,
( " dir/* " , 1 , 1 ) ,
( " dir/*?*?*?*?* " , 1 , 1 ) ,
( " dir/*?*?*?*?*?* " , 0 , 0 ) ,
( " some_dir/*/file " , 2 , 1 ) ,
( " some_dir/dir?/* " , 2 , 1 ) ,
( " */*/* " , 3 , 2 ) ,
( " ? " , 0 , 0 ) ,
]
2020-01-15 07:52:45 +00:00
for pattern , paths_amount , files_amount in test_requests :
2022-03-22 16:39:58 +00:00
inside_table_func = (
" ' hdfs://hdfs1:9000 "
+ globs_dir
+ pattern
+ " ' , ' TSV ' , ' id UInt64, text String, number Float64 ' "
)
2020-09-28 17:20:04 +00:00
print ( " inside_table_func " , inside_table_func )
2022-03-22 16:39:58 +00:00
assert (
node1 . query ( " select * from hdfs( " + inside_table_func + " ) " )
== paths_amount * some_data
)
assert node1 . query (
" select count(distinct _path) from hdfs( " + inside_table_func + " ) "
) . rstrip ( ) == str ( paths_amount )
assert node1 . query (
" select count(distinct _file) from hdfs( " + inside_table_func + " ) "
) . rstrip ( ) == str ( files_amount )
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table_with_parameter_gzip ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' gzip ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_table_with_parameter_none ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' none ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table_with_parameter_auto_gz ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' auto ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_write_gz_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage.gz ' , ' TSV ' ) "
)
2019-11-19 12:46:07 +00:00
node1 . query ( " insert into GZHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /storage.gz " ) == " 1 \t Mark \t 72.53 \n "
2019-11-19 12:46:07 +00:00
assert node1 . query ( " select * from GZHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_write_gzip_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/gzip_storage ' , ' TSV ' , ' gzip ' ) "
)
2019-11-19 12:46:07 +00:00
node1 . query ( " insert into GZIPHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /gzip_storage " ) == " 1 \t Mark \t 72.53 \n "
2019-11-19 12:46:07 +00:00
assert node1 . query ( " select * from GZIPHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-09 12:13:20 +00:00
2021-04-20 08:38:14 +00:00
2022-08-05 16:20:15 +00:00
def test_virtual_columns ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2022-03-22 16:39:58 +00:00
node1 . query (
" create table virtual_cols (id UInt32) ENGINE = HDFS( ' hdfs://hdfs1:9000/file* ' , ' TSV ' ) "
)
2021-04-27 17:20:13 +00:00
hdfs_api . write_data ( " /file1 " , " 1 \n " )
hdfs_api . write_data ( " /file2 " , " 2 \n " )
hdfs_api . write_data ( " /file3 " , " 3 \n " )
2023-08-23 18:43:08 +00:00
expected = " 1 \t file1 \t hdfs://hdfs1:9000/file1 \n 2 \t file2 \t hdfs://hdfs1:9000/file2 \n 3 \t file3 \t hdfs://hdfs1:9000/file3 \n "
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select id, _file as file_name, _path as file_path from virtual_cols order by id "
)
== expected
)
2021-04-20 08:38:14 +00:00
2021-06-21 13:50:09 +00:00
2022-08-05 16:20:15 +00:00
def test_read_files_with_spaces ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-11-10 14:20:25 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
2022-03-22 16:39:58 +00:00
dir = " /test_spaces "
2021-11-10 14:20:25 +00:00
exists = fs . exists ( dir )
if exists :
fs . delete ( dir , recursive = True )
fs . mkdirs ( dir )
hdfs_api . write_data ( f " { dir } /test test test 1.txt " , " 1 \n " )
hdfs_api . write_data ( f " { dir } /test test test 2.txt " , " 2 \n " )
hdfs_api . write_data ( f " { dir } /test test test 3.txt " , " 3 \n " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table test (id UInt32) ENGINE = HDFS( ' hdfs://hdfs1:9000/ { dir } /test* ' , ' TSV ' ) "
)
2021-04-19 20:39:22 +00:00
assert node1 . query ( " select * from test order by id " ) == " 1 \n 2 \n 3 \n "
2021-11-10 14:20:25 +00:00
fs . delete ( dir , recursive = True )
2021-04-19 20:39:22 +00:00
2022-08-05 16:20:15 +00:00
def test_truncate_table ( started_cluster ) :
2021-06-21 13:50:09 +00:00
hdfs_api = started_cluster . hdfs_api
node1 . query (
2022-03-22 16:39:58 +00:00
" create table test_truncate (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/tr ' , ' TSV ' ) "
)
2021-06-21 13:50:09 +00:00
node1 . query ( " insert into test_truncate values (1, ' Mark ' , 72.53) " )
assert hdfs_api . read_data ( " /tr " ) == " 1 \t Mark \t 72.53 \n "
assert node1 . query ( " select * from test_truncate " ) == " 1 \t Mark \t 72.53 \n "
node1 . query ( " truncate table test_truncate " )
assert node1 . query ( " select * from test_truncate " ) == " "
node1 . query ( " drop table test_truncate " )
2022-08-05 16:20:15 +00:00
def test_partition_by ( started_cluster ) :
2021-10-25 16:23:44 +00:00
hdfs_api = started_cluster . hdfs_api
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
file_name = " test_ {_partition_id} "
partition_by = " column3 "
values = " (1, 2, 3), (3, 2, 1), (1, 3, 2) "
table_function = f " hdfs( ' hdfs://hdfs1:9000/ { file_name } ' , ' TSV ' , ' { table_format } ' ) "
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function { table_function } PARTITION BY { partition_by } values { values } "
)
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_1 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 3 \t 2 \t 1 "
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_2 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 1 \t 3 \t 2 "
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_3 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 1 \t 2 \t 3 "
2021-10-25 16:23:44 +00:00
2021-10-26 12:22:13 +00:00
file_name = " test2_ {_partition_id} "
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS( ' hdfs://hdfs1:9000/ { file_name } ' , ' TSV ' ) partition by column3 "
)
2021-10-26 12:22:13 +00:00
node1 . query ( f " insert into p values { values } " )
2022-03-22 16:39:58 +00:00
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test2_1 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 3 \t 2 \t 1 "
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test2_2 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 1 \t 3 \t 2 "
result = node1 . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test2_3 ' , ' TSV ' , ' { table_format } ' ) "
)
assert result . strip ( ) == " 1 \t 2 \t 3 "
2021-10-26 12:22:13 +00:00
2021-10-25 16:23:44 +00:00
2022-08-05 16:20:15 +00:00
def test_seekable_formats ( started_cluster ) :
2021-10-31 19:53:24 +00:00
hdfs_api = started_cluster . hdfs_api
2022-03-22 16:39:58 +00:00
table_function = (
f " hdfs( ' hdfs://hdfs1:9000/parquet ' , ' Parquet ' , ' a Int32, b String ' ) "
)
node1 . query (
f " insert into table function { table_function } SELECT number, randomString(100) FROM numbers(5000000) "
)
2021-10-31 19:53:24 +00:00
result = node1 . query ( f " SELECT count() FROM { table_function } " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2021-10-31 19:53:24 +00:00
table_function = f " hdfs( ' hdfs://hdfs1:9000/orc ' , ' ORC ' , ' a Int32, b String ' ) "
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function { table_function } SELECT number, randomString(100) FROM numbers(5000000) "
)
2021-10-31 19:53:24 +00:00
result = node1 . query ( f " SELECT count() FROM { table_function } " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2021-10-31 19:53:24 +00:00
2021-12-17 15:34:13 +00:00
2022-08-05 16:20:15 +00:00
def test_read_table_with_default ( started_cluster ) :
2021-12-06 03:54:45 +00:00
hdfs_api = started_cluster . hdfs_api
data = " n \n 100 \n "
hdfs_api . write_data ( " /simple_table_function " , data )
assert hdfs_api . read_data ( " /simple_table_function " ) == data
output = " n \t m \n 100 \t 200 \n "
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSVWithNames ' , ' n UInt32, m UInt32 DEFAULT n * 2 ' ) FORMAT TSVWithNames "
)
== output
)
2021-12-06 03:54:45 +00:00
2022-08-05 16:20:15 +00:00
def test_schema_inference ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' , ' a Int32, b String ' ) SELECT number, randomString(100) FROM numbers(5000000) "
)
2021-12-17 15:34:13 +00:00
2021-12-20 21:00:40 +00:00
result = node1 . query ( f " desc hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' ) " )
2021-12-17 15:34:13 +00:00
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
2022-03-22 16:39:58 +00:00
result = node1 . query (
f " select count(*) from hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' ) "
)
assert int ( result ) == 5000000
2021-12-17 15:34:13 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table schema_inference engine=HDFS( ' hdfs://hdfs1:9000/native ' , ' Native ' ) "
)
2021-12-17 15:34:13 +00:00
result = node1 . query ( f " desc schema_inference " )
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
result = node1 . query ( f " select count(*) from schema_inference " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2021-12-17 15:34:13 +00:00
2021-10-31 19:53:24 +00:00
2022-08-05 16:20:15 +00:00
def test_hdfsCluster ( started_cluster ) :
2021-12-03 05:25:14 +00:00
hdfs_api = started_cluster . hdfs_api
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
2022-03-22 16:39:58 +00:00
dir = " /test_hdfsCluster "
2021-12-03 05:25:14 +00:00
exists = fs . exists ( dir )
if exists :
fs . delete ( dir , recursive = True )
fs . mkdirs ( dir )
hdfs_api . write_data ( " /test_hdfsCluster/file1 " , " 1 \n " )
hdfs_api . write_data ( " /test_hdfsCluster/file2 " , " 2 \n " )
hdfs_api . write_data ( " /test_hdfsCluster/file3 " , " 3 \n " )
2022-03-22 16:39:58 +00:00
actual = node1 . query (
" select id, _file as file_name, _path as file_path from hdfs( ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) order by id "
)
2021-12-03 05:25:14 +00:00
expected = " 1 \t file1 \t hdfs://hdfs1:9000/test_hdfsCluster/file1 \n 2 \t file2 \t hdfs://hdfs1:9000/test_hdfsCluster/file2 \n 3 \t file3 \t hdfs://hdfs1:9000/test_hdfsCluster/file3 \n "
assert actual == expected
2022-03-22 16:39:58 +00:00
actual = node1 . query (
" select id, _file as file_name, _path as file_path from hdfsCluster( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) order by id "
)
2021-12-03 05:25:14 +00:00
expected = " 1 \t file1 \t hdfs://hdfs1:9000/test_hdfsCluster/file1 \n 2 \t file2 \t hdfs://hdfs1:9000/test_hdfsCluster/file2 \n 3 \t file3 \t hdfs://hdfs1:9000/test_hdfsCluster/file3 \n "
assert actual == expected
fs . delete ( dir , recursive = True )
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_hdfs_directory_not_exist ( started_cluster ) :
2022-03-22 16:39:58 +00:00
ddl = " create table HDFSStorageWithNotExistDir (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/data/not_eixst ' , ' TSV ' ) "
2022-01-18 05:55:41 +00:00
node1 . query ( ddl )
2022-01-18 12:47:04 +00:00
assert " " == node1 . query ( " select * from HDFSStorageWithNotExistDir " )
2021-12-03 05:25:14 +00:00
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_overwrite ( started_cluster ) :
2021-12-29 18:03:15 +00:00
hdfs_api = started_cluster . hdfs_api
table_function = f " hdfs( ' hdfs://hdfs1:9000/data ' , ' Parquet ' , ' a Int32, b String ' ) "
2022-01-18 19:26:13 +00:00
node1 . query ( f " create table test_overwrite as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_overwrite select number, randomString(100) from numbers(5) "
)
node1 . query_and_get_error (
f " insert into test_overwrite select number, randomString(100) FROM numbers(10) "
)
node1 . query (
f " insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
2022-01-18 19:26:13 +00:00
result = node1 . query ( f " select count() from test_overwrite " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 10
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_multiple_inserts ( started_cluster ) :
2021-12-29 18:03:15 +00:00
hdfs_api = started_cluster . hdfs_api
table_function = f " hdfs( ' hdfs://hdfs1:9000/data_multiple_inserts ' , ' Parquet ' , ' a Int32, b String ' ) "
node1 . query ( f " create table test_multiple_inserts as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(10) "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1 "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " select count() from test_multiple_inserts " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 60
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " drop table test_multiple_inserts " )
table_function = f " hdfs( ' hdfs://hdfs1:9000/data_multiple_inserts.gz ' , ' Parquet ' , ' a Int32, b String ' ) "
node1 . query ( f " create table test_multiple_inserts as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(10) "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1 "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " select count() from test_multiple_inserts " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 60
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_format_detection ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table arrow_table (x UInt64) engine=HDFS( ' hdfs://hdfs1:9000/data.arrow ' ) "
)
2022-01-14 11:00:50 +00:00
node1 . query ( f " insert into arrow_table select 1 " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/data.arrow ' ) " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 1
2022-01-14 11:00:50 +00:00
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_schema_inference_with_globs ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/data1.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select NULL "
)
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/data2.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select 0 "
)
2023-09-25 22:15:41 +00:00
result = node1 . query (
f " desc hdfs( ' hdfs://hdfs1:9000/data*.jsoncompacteachrow ' ) settings input_format_json_infer_incomplete_types_as_strings=0 "
)
2022-07-21 16:54:42 +00:00
assert result . strip ( ) == " c1 \t Nullable(Int64) "
2022-02-09 16:14:14 +00:00
2022-03-22 16:39:58 +00:00
result = node1 . query (
2023-09-25 21:45:11 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/data*.jsoncompacteachrow ' ) settings input_format_json_infer_incomplete_types_as_strings=0 "
2022-03-22 16:39:58 +00:00
)
assert sorted ( result . split ( ) ) == [ " 0 " , " \\ N " ]
2022-02-09 16:14:14 +00:00
2022-04-13 16:59:04 +00:00
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/data3.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select NULL "
)
filename = " data { 1,3}.jsoncompacteachrow "
2022-06-29 11:10:39 +00:00
result = node1 . query_and_get_error (
2023-09-25 21:45:11 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/ { filename } ' ) settings schema_inference_use_cache_for_hdfs=0, input_format_json_infer_incomplete_types_as_strings=0 "
2022-06-29 11:10:39 +00:00
)
2022-04-13 16:59:04 +00:00
assert " All attempts to extract table structure from files failed " in result
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/data0.jsoncompacteachrow ' , ' TSV ' , ' x String ' ) select ' [123;] ' "
)
2022-04-20 14:34:05 +00:00
2022-04-13 16:59:04 +00:00
result = node1 . query_and_get_error (
2023-09-25 21:45:11 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/data*.jsoncompacteachrow ' ) settings schema_inference_use_cache_for_hdfs=0, input_format_json_infer_incomplete_types_as_strings=0 "
2022-04-13 16:59:04 +00:00
)
2024-01-26 01:02:03 +00:00
assert " CANNOT_EXTRACT_TABLE_STRUCTURE " in result
2022-04-13 16:59:04 +00:00
2022-02-09 16:14:14 +00:00
2022-08-05 16:20:15 +00:00
def test_insert_select_schema_inference ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) select toUInt64(1) as x "
)
2022-02-18 16:19:42 +00:00
result = node1 . query ( f " desc hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) " )
2022-03-22 16:39:58 +00:00
assert result . strip ( ) == " x \t UInt64 "
2022-02-18 16:19:42 +00:00
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 1
2022-02-18 16:19:42 +00:00
2022-04-07 10:11:49 +00:00
2022-08-05 16:20:15 +00:00
def test_cluster_join ( started_cluster ) :
2022-03-30 08:19:16 +00:00
result = node1 . query (
2022-03-31 01:28:07 +00:00
"""
2022-03-30 08:19:16 +00:00
SELECT l . id , r . id FROM hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) as l
JOIN hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) as r
ON l . id = r . id
2022-03-31 01:28:07 +00:00
"""
2022-03-30 08:19:16 +00:00
)
assert " AMBIGUOUS_COLUMN_NAME " not in result
2022-02-18 16:19:42 +00:00
2022-08-05 16:20:15 +00:00
def test_cluster_macro ( started_cluster ) :
2022-04-27 23:32:49 +00:00
with_macro = node1 . query (
"""
SELECT id FROM hdfsCluster ( ' {default_cluster_macro} ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' )
"""
)
no_macro = node1 . query (
"""
SELECT id FROM hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' )
"""
)
assert TSV ( with_macro ) == TSV ( no_macro )
2022-08-05 16:20:15 +00:00
def test_virtual_columns_2 ( started_cluster ) :
2022-03-24 16:10:04 +00:00
hdfs_api = started_cluster . hdfs_api
2022-03-25 09:14:14 +00:00
table_function = (
2022-03-30 15:34:07 +00:00
f " hdfs( ' hdfs://hdfs1:9000/parquet_2 ' , ' Parquet ' , ' a Int32, b String ' ) "
2022-03-25 09:14:14 +00:00
)
2022-03-24 16:10:04 +00:00
node1 . query ( f " insert into table function { table_function } SELECT 1, ' kek ' " )
result = node1 . query ( f " SELECT _path FROM { table_function } " )
2022-03-31 09:13:38 +00:00
assert result . strip ( ) == " hdfs://hdfs1:9000/parquet_2 "
table_function = (
f " hdfs( ' hdfs://hdfs1:9000/parquet_3 ' , ' Parquet ' , ' a Int32, _path String ' ) "
)
node1 . query ( f " insert into table function { table_function } SELECT 1, ' kek ' " )
result = node1 . query ( f " SELECT _path FROM { table_function } " )
assert result . strip ( ) == " kek "
2022-03-24 16:10:04 +00:00
2023-08-22 11:59:59 +00:00
def check_profile_event_for_query ( node , file , profile_event , amount = 1 ) :
2022-06-27 12:43:24 +00:00
node . query ( " system flush logs " )
2023-08-22 11:59:59 +00:00
query_pattern = f " hdfs( ' hdfs://hdfs1:9000/ { file } ' " . replace ( " ' " , " \\ ' " )
assert (
int (
node . query (
f " select ProfileEvents[ ' { profile_event } ' ] from system.query_log where query like ' % { query_pattern } % ' and type = ' QueryFinish ' order by query_start_time_microseconds desc limit 1 "
)
2022-06-27 12:43:24 +00:00
)
2023-08-22 11:59:59 +00:00
== amount
2022-06-27 12:43:24 +00:00
)
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
def check_cache_misses ( node1 , file , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query ( node1 , file , " SchemaInferenceCacheMisses " , amount )
2022-08-05 16:20:15 +00:00
def check_cache_hits ( node1 , file , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query ( node1 , file , " SchemaInferenceCacheHits " , amount )
2022-08-05 16:20:15 +00:00
def check_cache_invalidations ( node1 , file , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query (
node1 , file , " SchemaInferenceCacheInvalidations " , amount
2022-08-11 10:55:18 +00:00
)
2022-08-05 16:20:15 +00:00
def check_cache_evictions ( node1 , file , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query ( node1 , file , " SchemaInferenceCacheEvictions " , amount )
def check_cache_num_rows_hits ( node1 , file , amount = 1 ) :
check_profile_event_for_query (
node1 , file , " SchemaInferenceCacheNumRowsHits " , amount
2022-08-11 10:55:18 +00:00
)
2022-08-05 16:20:15 +00:00
def check_cache ( node1 , expected_files ) :
sources = node1 . query ( " select source from system.schema_inference_cache " )
2022-08-11 10:55:18 +00:00
assert sorted ( map ( lambda x : x . strip ( ) . split ( " / " ) [ - 1 ] , sources . split ( ) ) ) == sorted (
expected_files
)
2022-08-05 16:20:15 +00:00
def run_describe_query ( node , file ) :
query = f " desc hdfs( ' hdfs://hdfs1:9000/ { file } ' ) "
node . query ( query )
2023-08-22 11:59:59 +00:00
def run_count_query ( node , file ) :
query = f " select count() from hdfs( ' hdfs://hdfs1:9000/ { file } ' , auto, ' x UInt64 ' ) "
return node . query ( query )
2022-06-21 13:02:48 +00:00
def test_schema_inference_cache ( started_cluster ) :
2022-08-16 10:39:35 +00:00
node1 . query ( " system drop schema cache " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-21 13:02:48 +00:00
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " ] )
check_cache_misses ( node1 , " test_cache0.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_hits ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-21 13:02:48 +00:00
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_invalidations ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache1.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
2022-08-05 16:20:15 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache1.jsonl " ] )
check_cache_misses ( node1 , " test_cache1.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache_hits ( node1 , " test_cache1.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache2.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-27 12:43:24 +00:00
)
2022-08-05 16:20:15 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache ( node1 , [ " test_cache1.jsonl " , " test_cache2.jsonl " ] )
check_cache_misses ( node1 , " test_cache2.jsonl " )
check_cache_evictions ( node1 , " test_cache2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache_hits ( node1 , " test_cache2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache_hits ( node1 , " test_cache1.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache1.jsonl " ] )
check_cache_misses ( node1 , " test_cache0.jsonl " )
check_cache_evictions ( node1 , " test_cache0.jsonl " )
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache2.jsonl " ] )
check_cache_misses ( node1 , " test_cache2.jsonl " )
check_cache_evictions ( node1 , " test_cache2.jsonl " )
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache_hits ( node1 , " test_cache2.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_hits ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
2022-06-27 12:43:24 +00:00
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache3.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-08-05 16:20:15 +00:00
files = " test_cache { 0,1,2,3}.jsonl "
run_describe_query ( node1 , files )
check_cache_hits ( node1 , files )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node1 . query ( f " system drop schema cache for hdfs " )
check_cache ( node1 , [ ] )
run_describe_query ( node1 , files )
check_cache_misses ( node1 , files , 4 )
node1 . query ( " system drop schema cache " )
check_cache ( node1 , [ ] )
run_describe_query ( node1 , files )
check_cache_misses ( node1 , files , 4 )
2022-06-21 13:02:48 +00:00
2023-08-22 11:59:59 +00:00
node1 . query ( " system drop schema cache " )
check_cache ( node1 , [ ] )
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.csv ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query ( node1 , " test_cache0.csv " )
assert int ( res ) == 100
check_cache ( node1 , [ " test_cache0.csv " ] )
check_cache_misses ( node1 , " test_cache0.csv " )
res = run_count_query ( node1 , " test_cache0.csv " )
assert int ( res ) == 100
check_cache_hits ( node1 , " test_cache0.csv " )
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.csv ' ) select * from numbers(200) settings hdfs_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query ( node1 , " test_cache0.csv " )
assert int ( res ) == 200
check_cache_invalidations ( node1 , " test_cache0.csv " )
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache1.csv ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query ( node1 , " test_cache1.csv " )
assert int ( res ) == 100
check_cache ( node1 , [ " test_cache0.csv " , " test_cache1.csv " ] )
check_cache_misses ( node1 , " test_cache1.csv " )
res = run_count_query ( node1 , " test_cache1.csv " )
assert int ( res ) == 100
check_cache_hits ( node1 , " test_cache1.csv " )
res = run_count_query ( node1 , " test_cache { 0,1}.csv " )
assert int ( res ) == 300
check_cache_hits ( node1 , " test_cache { 0,1}.csv " , 2 )
node1 . query ( f " system drop schema cache for hdfs " )
check_cache ( node1 , [ ] )
res = run_count_query ( node1 , " test_cache { 0,1}.csv " )
assert int ( res ) == 300
check_cache_misses ( node1 , " test_cache { 0,1}.csv " , 2 )
node1 . query ( f " system drop schema cache for hdfs " )
check_cache ( node1 , [ ] )
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache.parquet ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = node1 . query (
f " select count() from hdfs( ' hdfs://hdfs1:9000/test_cache.parquet ' ) "
)
assert int ( res ) == 100
check_cache_misses ( node1 , " test_cache.parquet " )
check_cache_hits ( node1 , " test_cache.parquet " )
check_cache_num_rows_hits ( node1 , " test_cache.parquet " )
2022-06-21 13:02:48 +00:00
2022-11-16 11:47:57 +00:00
def test_hdfsCluster_skip_unavailable_shards ( started_cluster ) :
2023-02-23 09:05:51 +00:00
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
2022-11-16 15:43:23 +00:00
hdfs_api = started_cluster . hdfs_api
2022-11-15 05:25:15 +00:00
node = started_cluster . instances [ " node1 " ]
2022-11-16 11:47:57 +00:00
data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2022-11-16 15:43:23 +00:00
hdfs_api . write_data ( " /skip_unavailable_shards " , data )
2022-11-15 05:25:15 +00:00
2022-11-16 11:47:57 +00:00
assert (
node1 . query (
2022-11-16 22:41:43 +00:00
" select * from hdfsCluster( ' cluster_non_existent_port ' , ' hdfs://hdfs1:9000/skip_unavailable_shards ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) settings skip_unavailable_shards = 1 "
2022-11-16 11:47:57 +00:00
)
== data
)
2022-11-15 05:25:15 +00:00
2023-02-23 09:05:51 +00:00
def test_hdfsCluster_unset_skip_unavailable_shards ( started_cluster ) :
2022-11-16 15:43:23 +00:00
hdfs_api = started_cluster . hdfs_api
2022-11-15 05:25:15 +00:00
node = started_cluster . instances [ " node1 " ]
2022-11-16 15:43:23 +00:00
data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
hdfs_api . write_data ( " /unskip_unavailable_shards " , data )
2022-11-15 05:25:15 +00:00
2023-02-23 09:05:51 +00:00
assert (
2023-02-23 09:18:56 +00:00
node1 . query (
" select * from hdfsCluster( ' cluster_non_existent_port ' , ' hdfs://hdfs1:9000/skip_unavailable_shards ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) "
)
== data
2023-02-23 09:05:51 +00:00
)
2022-11-15 05:25:15 +00:00
2023-05-30 19:32:24 +00:00
def test_skip_empty_files ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' , TSVRaw) select * from numbers(0) settings hdfs_truncate_on_insert=1 "
)
node . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/skip_empty_files2.parquet ' ) select * from numbers(1) settings hdfs_truncate_on_insert=1 "
)
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' ) settings hdfs_skip_empty_files=0 "
)
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' , auto, ' number UINt64 ' ) settings hdfs_skip_empty_files=0 "
)
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' ) settings hdfs_skip_empty_files=1 "
)
res = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' , auto, ' number UInt64 ' ) settings hdfs_skip_empty_files=1 "
)
assert len ( res ) == 0
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files*.parquet ' ) settings hdfs_skip_empty_files=0 "
)
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files*.parquet ' , auto, ' number UInt64 ' ) settings hdfs_skip_empty_files=0 "
)
res = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files*.parquet ' ) settings hdfs_skip_empty_files=1 "
)
assert int ( res ) == 0
res = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files*.parquet ' , auto, ' number UInt64 ' ) settings hdfs_skip_empty_files=1 "
)
assert int ( res ) == 0
2023-07-04 16:50:31 +00:00
def test_read_subcolumns ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_subcolumns.tsv ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) select ((1, 2), 3) "
)
node . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_subcolumns.jsonl ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) select ((1, 2), 3) "
)
res = node . query (
f " select a.b.d, _path, a.b, _file, a.e from hdfs( ' hdfs://hdfs1:9000/test_subcolumns.tsv ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert (
res
== " 2 \t hdfs://hdfs1:9000/test_subcolumns.tsv \t (1,2) \t test_subcolumns.tsv \t 3 \n "
)
res = node . query (
f " select a.b.d, _path, a.b, _file, a.e from hdfs( ' hdfs://hdfs1:9000/test_subcolumns.jsonl ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert (
res
== " 2 \t hdfs://hdfs1:9000/test_subcolumns.jsonl \t (1,2) \t test_subcolumns.jsonl \t 3 \n "
)
res = node . query (
f " select x.b.d, _path, x.b, _file, x.e from hdfs( ' hdfs://hdfs1:9000/test_subcolumns.jsonl ' , auto, ' x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert (
res
== " 0 \t hdfs://hdfs1:9000/test_subcolumns.jsonl \t (0,0) \t test_subcolumns.jsonl \t 0 \n "
)
res = node . query (
f " select x.b.d, _path, x.b, _file, x.e from hdfs( ' hdfs://hdfs1:9000/test_subcolumns.jsonl ' , auto, ' x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42) ' ) "
)
assert (
res
== " 42 \t hdfs://hdfs1:9000/test_subcolumns.jsonl \t (42,42) \t test_subcolumns.jsonl \t 42 \n "
)
2023-10-20 20:46:41 +00:00
def test_union_schema_inference_mode ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
" insert into function hdfs( ' hdfs://hdfs1:9000/test_union_schema_inference1.jsonl ' ) select 1 as a "
)
node . query (
" insert into function hdfs( ' hdfs://hdfs1:9000/test_union_schema_inference2.jsonl ' ) select 2 as b "
)
node . query ( " system drop schema cache for hdfs " )
result = node . query (
" desc hdfs( ' hdfs://hdfs1:9000/test_union_schema_inference*.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
)
assert result == " a \t Nullable(Int64) \n b \t Nullable(Int64) \n "
result = node . query (
" select schema_inference_mode, splitByChar( ' / ' , source)[-1] as file, schema from system.schema_inference_cache where source like ' % test_union_schema_inference % ' order by file format TSV "
)
assert (
result == " UNION \t test_union_schema_inference1.jsonl \t a Nullable(Int64) \n "
" UNION \t test_union_schema_inference2.jsonl \t b Nullable(Int64) \n "
)
result = node . query (
" select * from hdfs( ' hdfs://hdfs1:9000/test_union_schema_inference*.jsonl ' ) order by tuple(*) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
)
assert result == " 1 \t \\ N \n " " \\ N \t 2 \n "
node . query ( f " system drop schema cache for hdfs " )
result = node . query (
" desc hdfs( ' hdfs://hdfs1:9000/test_union_schema_inference2.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
)
assert result == " b \t Nullable(Int64) \n "
result = node . query (
" desc hdfs( ' hdfs://hdfs1:9000/test_union_schema_inference*.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
)
assert result == " a \t Nullable(Int64) \n " " b \t Nullable(Int64) \n "
node . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_union_schema_inference3.jsonl ' , TSV) select ' Error ' "
)
error = node . query_and_get_error (
" desc hdfs( ' hdfs://hdfs1:9000/test_union_schema_inference*.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
)
2024-01-24 17:55:31 +00:00
assert " CANNOT_EXTRACT_TABLE_STRUCTURE " in error
2023-10-20 20:46:41 +00:00
2024-01-22 22:55:50 +00:00
def test_format_detection ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
" insert into function hdfs( ' hdfs://hdfs1:9000/test_format_detection0 ' , JSONEachRow) select number as x, ' str_ ' || toString(number) as y from numbers(0) "
)
node . query (
" insert into function hdfs( ' hdfs://hdfs1:9000/test_format_detection1 ' , JSONEachRow) select number as x, ' str_ ' || toString(number) as y from numbers(10) "
)
expected_desc_result = node . query (
" desc hdfs( ' hdfs://hdfs1:9000/test_format_detection1 ' , JSONEachRow) "
)
desc_result = node . query ( " desc hdfs( ' hdfs://hdfs1:9000/test_format_detection1 ' ) " )
assert expected_desc_result == desc_result
expected_result = node . query (
" select * from hdfs( ' hdfs://hdfs1:9000/test_format_detection1 ' , JSONEachRow, ' x UInt64, y String ' ) order by x, y "
)
result = node . query (
" select * from hdfs( ' hdfs://hdfs1:9000/test_format_detection1 ' ) order by x, y "
)
assert expected_result == result
result = node . query (
" select * from hdfs( ' hdfs://hdfs1:9000/test_format_detection1 ' , auto, ' x UInt64, y String ' ) order by x, y "
)
assert expected_result == result
result = node . query (
" select * from hdfs( ' hdfs://hdfs1:9000/test_format_detection { 0,1} ' ) order by x, y "
)
assert expected_result == result
node . query ( " system drop schema cache for hdfs " )
result = node . query (
" select * from hdfs( ' hdfs://hdfs1:9000/test_format_detection { 0,1} ' ) order by x, y "
)
assert expected_result == result
result = node . query (
" select * from hdfsCluster(test_cluster_two_shards, ' hdfs://hdfs1:9000/test_format_detection { 0,1} ' ) order by x, y "
)
assert expected_result == result
result = node . query (
" select * from hdfsCluster(test_cluster_two_shards, ' hdfs://hdfs1:9000/test_format_detection { 0,1} ' , auto, auto) order by x, y "
)
assert expected_result == result
result = node . query (
" select * from hdfsCluster(test_cluster_two_shards, ' hdfs://hdfs1:9000/test_format_detection { 0,1} ' , auto, ' x UInt64, y String ' ) order by x, y "
)
assert expected_result == result
2024-04-08 20:37:06 +00:00
def test_write_to_globbed_partitioned_path ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
error = node . query_and_get_error (
" insert into function hdfs( ' hdfs://hdfs1:9000/test_data_*_ {_partition_id} .csv ' ) partition by 42 select 42 "
)
assert " DATABASE_ACCESS_DENIED " in error
2024-05-10 12:28:55 +00:00
2024-04-08 20:18:47 +00:00
def test_respect_object_existence_on_partitioned_write ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
" insert into function hdfs( ' hdfs://hdfs1:9000/test_partitioned_write42.csv ' , CSV) select 42 settings hdfs_truncate_on_insert=1 "
)
result = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_partitioned_write42.csv ' , CSV) "
)
assert int ( result ) == 42
error = node . query_and_get_error (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test_partitioned_write {{ _partition_id }} .csv ' , CSV) partition by 42 select 42 settings hdfs_truncate_on_insert=0 "
)
assert " BAD_ARGUMENTS " in error
node . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test_partitioned_write {{ _partition_id }} .csv ' , CSV) partition by 42 select 43 settings hdfs_truncate_on_insert=1 "
)
result = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_partitioned_write42.csv ' , CSV) "
)
assert int ( result ) == 43
node . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test_partitioned_write {{ _partition_id }} .csv ' , CSV) partition by 42 select 44 settings hdfs_truncate_on_insert=0, hdfs_create_new_file_on_insert=1 "
)
result = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_partitioned_write42.1.csv ' , CSV) "
)
assert int ( result ) == 44
2024-04-08 20:37:06 +00:00
2022-03-22 16:39:58 +00:00
if __name__ == " __main__ " :
2020-09-09 12:13:20 +00:00
cluster . start ( )
2020-10-30 19:40:16 +00:00
input ( " Cluster created, press any key to destroy... " )
2020-09-09 12:13:20 +00:00
cluster . shutdown ( )