2018-12-05 13:24:45 +00:00
import os
2024-09-27 10:19:39 +00:00
import re
import time
import uuid
2018-12-05 13:24:45 +00:00
2020-09-16 04:26:10 +00:00
import pytest
2024-09-27 10:19:39 +00:00
from pyhdfs import HdfsClient
2024-07-05 12:44:31 +00:00
from helpers . client import QueryRuntimeException
2024-09-27 10:19:39 +00:00
from helpers . cluster import ClickHouseCluster , is_arm
2022-04-27 23:32:49 +00:00
from helpers . test_tools import TSV
2018-12-05 13:24:45 +00:00
2024-03-20 14:05:06 +00:00
if is_arm ( ) :
pytestmark = pytest . mark . skip
2018-12-05 13:24:45 +00:00
cluster = ClickHouseCluster ( __file__ )
2022-04-27 23:32:49 +00:00
node1 = cluster . add_instance (
2022-08-11 10:55:18 +00:00
" node1 " ,
2022-11-16 01:18:45 +00:00
main_configs = [
2022-11-16 01:59:44 +00:00
" configs/macro.xml " ,
" configs/schema_cache.xml " ,
" configs/cluster.xml " ,
2022-11-16 01:33:46 +00:00
] ,
2022-08-11 10:55:18 +00:00
with_hdfs = True ,
2022-04-27 23:32:49 +00:00
)
2018-12-05 13:24:45 +00:00
2020-09-16 04:26:10 +00:00
2018-12-05 13:24:45 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_storage ( started_cluster ) :
2024-07-24 19:55:03 +00:00
id = uuid . uuid4 ( )
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2024-07-24 19:55:03 +00:00
filename = f " simple_storage_ { id } "
2021-07-12 08:32:20 +00:00
node1 . query ( " drop table if exists SimpleHDFSStorage SYNC " )
2020-09-16 04:26:10 +00:00
node1 . query (
2024-07-24 19:55:03 +00:00
f " create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/ { filename } ' , ' TSV ' ) "
2022-03-22 16:39:58 +00:00
)
2019-09-05 14:42:17 +00:00
node1 . query ( " insert into SimpleHDFSStorage values (1, ' Mark ' , 72.53) " )
2024-07-24 19:55:03 +00:00
assert hdfs_api . read_data ( f " / { filename } " ) == " 1 \t Mark \t 72.53 \n "
2018-12-05 13:24:45 +00:00
assert node1 . query ( " select * from SimpleHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_storage_with_globs ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage { 1..5} ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithEnum (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage { 1,2,3,4,5} ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithQuestionMark (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage? ' , ' TSV ' ) "
)
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage* ' , ' TSV ' ) "
)
2019-09-05 14:42:17 +00:00
2019-09-20 11:26:00 +00:00
for i in [ " 1 " , " 2 " , " 3 " ] :
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /storage " + i , i + " \t Mark \t 72.53 \n " )
assert hdfs_api . read_data ( " /storage " + i ) == i + " \t Mark \t 72.53 \n "
2019-09-20 11:26:00 +00:00
2024-07-24 14:32:35 +00:00
node1 . query (
2024-07-26 08:41:14 +00:00
" create table HDFSStorageWithDoubleAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/**.doublestar.tsv ' , ' TSV ' ) "
2024-07-24 14:32:35 +00:00
)
for i in [ " 1 " , " 2 " , " 3 " ] :
2024-07-26 08:41:14 +00:00
hdfs_api . write_data ( f " /subdir { i } /file { i } .doublestar.tsv " , f " { i } \t Mark \t 72.53 \n " )
assert (
hdfs_api . read_data ( f " /subdir { i } /file { i } .doublestar.tsv " )
== f " { i } \t Mark \t 72.53 \n "
)
2024-07-24 14:32:35 +00:00
2024-03-27 18:28:11 +00:00
assert (
node1 . query (
" select count(*) from HDFSStorageWithRange settings s3_throw_on_zero_files_match=1 "
)
== " 3 \n "
)
2019-09-20 11:26:00 +00:00
assert node1 . query ( " select count(*) from HDFSStorageWithEnum " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithQuestionMark " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithAsterisk " ) == " 3 \n "
2024-07-24 14:32:35 +00:00
assert node1 . query ( " select count(*) from HDFSStorageWithDoubleAsterisk " ) == " 3 \n "
2019-09-20 11:26:00 +00:00
try :
node1 . query ( " insert into HDFSStorageWithEnum values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
try :
node1 . query ( " insert into HDFSStorageWithQuestionMark values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
try :
node1 . query ( " insert into HDFSStorageWithAsterisk values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
2019-09-05 14:42:17 +00:00
2024-07-24 14:32:35 +00:00
try :
node1 . query ( " insert into HDFSStorageWithDoubleAsterisk values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
print ( ex )
assert " in readonly mode " in str ( ex )
2024-07-24 19:55:03 +00:00
node1 . query ( " drop table HDFSStorageWithRange " )
node1 . query ( " drop table HDFSStorageWithEnum " )
node1 . query ( " drop table HDFSStorageWithQuestionMark " )
node1 . query ( " drop table HDFSStorageWithAsterisk " )
2024-08-08 07:51:57 +00:00
node1 . query ( " drop table HDFSStorageWithDoubleAsterisk " )
2020-09-16 04:26:10 +00:00
2023-06-11 00:09:05 +00:00
def test_storage_with_multidirectory_glob ( started_cluster ) :
hdfs_api = started_cluster . hdfs_api
for i in [ " 1 " , " 2 " ] :
2023-06-13 16:47:02 +00:00
hdfs_api . write_data (
f " /multiglob/p { i } /path { i } /postfix/data { i } " , f " File { i } \t { i } { i } \n "
)
assert (
hdfs_api . read_data ( f " /multiglob/p { i } /path { i } /postfix/data { i } " )
== f " File { i } \t { i } { i } \n "
)
2023-06-11 00:09:05 +00:00
2023-06-13 16:47:02 +00:00
r = node1 . query (
" SELECT * FROM hdfs( ' hdfs://hdfs1:9000/multiglob/ { p1/path1,p2/path2}/postfix/data { 1,2} ' , TSV) "
)
2023-06-11 16:42:10 +00:00
assert ( r == f " File1 \t 11 \n File2 \t 22 \n " ) or ( r == f " File2 \t 22 \n File1 \t 11 \n " )
2023-06-11 00:09:05 +00:00
2023-06-22 15:19:47 +00:00
try :
2023-06-22 17:48:28 +00:00
node1 . query (
2024-06-13 20:36:57 +00:00
" SELECT * FROM hdfs( ' hdfs://hdfs1:9000/multiglob/ { p4/path1,p2/path3}/postfix/data { 1,2}.nonexist ' , TSV) "
2023-06-22 17:48:28 +00:00
)
2023-06-22 15:19:47 +00:00
assert False , " Exception have to be thrown "
except Exception as ex :
print ( ex )
assert " no files " in str ( ex )
2023-06-11 00:09:05 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2018-12-05 13:24:45 +00:00
data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /simple_table_function " , data )
2018-12-05 13:24:45 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_table_function " ) == data
2018-12-05 13:24:45 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) "
)
== data
)
2019-01-17 14:10:30 +00:00
2022-08-05 16:20:15 +00:00
def test_write_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/other_storage ' , ' TSV ' ) "
)
node1 . query (
" insert into OtherHDFSStorage values (10, ' tomas ' , 55.55), (11, ' jack ' , 32.54) "
)
2019-01-17 14:10:30 +00:00
result = " 10 \t tomas \t 55.55 \n 11 \t jack \t 32.54 \n "
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /other_storage " ) == result
2019-01-17 14:10:30 +00:00
assert node1 . query ( " select * from OtherHDFSStorage order by id " ) == result
2024-07-24 19:55:03 +00:00
node1 . query ( " truncate table OtherHDFSStorage " )
node1 . query ( " drop table OtherHDFSStorage " )
2019-01-19 20:17:19 +00:00
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_bad_hdfs_uri ( started_cluster ) :
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage1 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hads:hgsdfs100500:9000/other_storage ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2024-08-03 22:24:17 +00:00
assert " Bad HDFS URL " in str ( ex )
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs100500:9000/other_storage ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2024-03-27 18:06:19 +00:00
assert " Unable to connect to HDFS " in str ( ex )
2019-01-19 20:17:19 +00:00
2024-07-24 19:55:03 +00:00
node1 . query ( " drop table BadStorage2 " )
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table BadStorage3 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/<> ' , ' TSV ' ) "
)
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " Unable to open HDFS file " in str ( ex )
2024-07-24 19:55:03 +00:00
node1 . query ( " drop table BadStorage3 " )
2019-08-01 15:46:54 +00:00
2022-03-22 16:39:58 +00:00
2020-09-28 17:20:04 +00:00
@pytest.mark.timeout ( 800 )
2022-08-05 16:20:15 +00:00
def test_globs_in_read_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-08-01 15:46:54 +00:00
some_data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2019-08-09 17:25:29 +00:00
globs_dir = " /dir_for_test_with_globs/ "
2022-03-22 16:39:58 +00:00
files = [
" dir1/dir_dir/file1 " ,
" dir2/file2 " ,
" simple_table_function " ,
" dir/file " ,
" some_dir/dir1/file " ,
" some_dir/dir2/file " ,
" some_dir/file " ,
" table1_function " ,
" table2_function " ,
" table3_function " ,
]
2019-08-09 17:25:29 +00:00
for filename in files :
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( globs_dir + filename , some_data )
2019-08-09 17:25:29 +00:00
2022-03-22 16:39:58 +00:00
test_requests = [
( " dir { 1..5}/dir_dir/file1 " , 1 , 1 ) ,
( " *_table_functio? " , 1 , 1 ) ,
( " dir/fil? " , 1 , 1 ) ,
( " table { 3..8}_function " , 1 , 1 ) ,
( " table { 2..8}_function " , 2 , 2 ) ,
( " dir/* " , 1 , 1 ) ,
( " dir/*?*?*?*?* " , 1 , 1 ) ,
( " dir/*?*?*?*?*?* " , 0 , 0 ) ,
( " some_dir/*/file " , 2 , 1 ) ,
( " some_dir/dir?/* " , 2 , 1 ) ,
( " */*/* " , 3 , 2 ) ,
( " ? " , 0 , 0 ) ,
]
2020-01-15 07:52:45 +00:00
for pattern , paths_amount , files_amount in test_requests :
2022-03-22 16:39:58 +00:00
inside_table_func = (
" ' hdfs://hdfs1:9000 "
+ globs_dir
+ pattern
+ " ' , ' TSV ' , ' id UInt64, text String, number Float64 ' "
)
2020-09-28 17:20:04 +00:00
print ( " inside_table_func " , inside_table_func )
2022-03-22 16:39:58 +00:00
assert (
2024-06-13 20:36:57 +00:00
node1 . query ( " select * from hdfs( " + inside_table_func + " ) " )
2022-03-22 16:39:58 +00:00
== paths_amount * some_data
)
assert node1 . query (
2024-06-13 20:36:57 +00:00
" select count(distinct _path) from hdfs( " + inside_table_func + " ) "
2022-03-22 16:39:58 +00:00
) . rstrip ( ) == str ( paths_amount )
assert node1 . query (
2024-06-13 20:36:57 +00:00
" select count(distinct _file) from hdfs( " + inside_table_func + " ) "
2022-03-22 16:39:58 +00:00
) . rstrip ( ) == str ( files_amount )
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table_with_parameter_gzip ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' gzip ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_table_with_parameter_none ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' none ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_read_write_gzip_table_with_parameter_auto_gz ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' auto ' ) "
)
== data
)
2020-09-16 04:26:10 +00:00
2019-11-19 12:46:07 +00:00
2022-08-05 16:20:15 +00:00
def test_write_gz_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage.gz ' , ' TSV ' ) "
)
2019-11-19 12:46:07 +00:00
node1 . query ( " insert into GZHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /storage.gz " ) == " 1 \t Mark \t 72.53 \n "
2019-11-19 12:46:07 +00:00
assert node1 . query ( " select * from GZHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2024-07-24 19:55:03 +00:00
node1 . query ( " truncate table GZHDFSStorage " )
node1 . query ( " drop table GZHDFSStorage " )
2019-11-19 12:46:07 +00:00
2020-09-16 04:26:10 +00:00
2022-08-05 16:20:15 +00:00
def test_write_gzip_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
2022-03-22 16:39:58 +00:00
" create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/gzip_storage ' , ' TSV ' , ' gzip ' ) "
)
2019-11-19 12:46:07 +00:00
node1 . query ( " insert into GZIPHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /gzip_storage " ) == " 1 \t Mark \t 72.53 \n "
2019-11-19 12:46:07 +00:00
assert node1 . query ( " select * from GZIPHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2024-07-24 19:55:03 +00:00
node1 . query ( " truncate table GZIPHDFSStorage " )
node1 . query ( " drop table GZIPHDFSStorage " )
2020-09-09 12:13:20 +00:00
2021-04-20 08:38:14 +00:00
2022-08-05 16:20:15 +00:00
def test_virtual_columns ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2022-03-22 16:39:58 +00:00
node1 . query (
" create table virtual_cols (id UInt32) ENGINE = HDFS( ' hdfs://hdfs1:9000/file* ' , ' TSV ' ) "
)
2021-04-27 17:20:13 +00:00
hdfs_api . write_data ( " /file1 " , " 1 \n " )
hdfs_api . write_data ( " /file2 " , " 2 \n " )
hdfs_api . write_data ( " /file3 " , " 3 \n " )
2024-05-23 19:10:40 +00:00
expected = " 1 \t file1 \t file1 \n 2 \t file2 \t file2 \n 3 \t file3 \t file3 \n "
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select id, _file as file_name, _path as file_path from virtual_cols order by id "
)
== expected
)
2024-07-24 19:55:03 +00:00
node1 . query ( " drop table virtual_cols " )
2021-04-20 08:38:14 +00:00
2021-06-21 13:50:09 +00:00
2022-08-05 16:20:15 +00:00
def test_read_files_with_spaces ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-11-10 14:20:25 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
2022-03-22 16:39:58 +00:00
dir = " /test_spaces "
2021-11-10 14:20:25 +00:00
exists = fs . exists ( dir )
if exists :
fs . delete ( dir , recursive = True )
fs . mkdirs ( dir )
hdfs_api . write_data ( f " { dir } /test test test 1.txt " , " 1 \n " )
hdfs_api . write_data ( f " { dir } /test test test 2.txt " , " 2 \n " )
hdfs_api . write_data ( f " { dir } /test test test 3.txt " , " 3 \n " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table test (id UInt32) ENGINE = HDFS( ' hdfs://hdfs1:9000/ { dir } /test* ' , ' TSV ' ) "
)
2021-04-19 20:39:22 +00:00
assert node1 . query ( " select * from test order by id " ) == " 1 \n 2 \n 3 \n "
2021-11-10 14:20:25 +00:00
fs . delete ( dir , recursive = True )
2024-07-24 19:55:03 +00:00
node1 . query ( f " drop table test " )
2021-11-10 14:20:25 +00:00
2021-04-19 20:39:22 +00:00
2022-08-05 16:20:15 +00:00
def test_truncate_table ( started_cluster ) :
2021-06-21 13:50:09 +00:00
hdfs_api = started_cluster . hdfs_api
node1 . query (
2022-03-22 16:39:58 +00:00
" create table test_truncate (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/tr ' , ' TSV ' ) "
)
2021-06-21 13:50:09 +00:00
node1 . query ( " insert into test_truncate values (1, ' Mark ' , 72.53) " )
assert hdfs_api . read_data ( " /tr " ) == " 1 \t Mark \t 72.53 \n "
assert node1 . query ( " select * from test_truncate " ) == " 1 \t Mark \t 72.53 \n "
node1 . query ( " truncate table test_truncate " )
2024-03-28 14:00:49 +00:00
assert (
node1 . query (
" select * from test_truncate settings hdfs_ignore_file_doesnt_exist=1 "
)
== " "
)
2021-06-21 13:50:09 +00:00
node1 . query ( " drop table test_truncate " )
2022-08-05 16:20:15 +00:00
def test_partition_by ( started_cluster ) :
2024-07-24 19:55:03 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
id = uuid . uuid4 ( )
2021-10-25 16:23:44 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2024-07-24 19:55:03 +00:00
dir = f " partition_ { id } "
fs . mkdirs ( f " / { dir } / " , permission = 777 )
2021-10-25 16:23:44 +00:00
file_name = " test_ {_partition_id} "
partition_by = " column3 "
values = " (1, 2, 3), (3, 2, 1), (1, 3, 2) "
2024-07-24 19:55:21 +00:00
table_function = (
f " hdfs( ' hdfs://hdfs1:9000/ { dir } / { file_name } ' , ' TSV ' , ' { table_format } ' ) "
)
2021-10-25 16:23:44 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into table function { table_function } PARTITION BY { partition_by } values { values } "
)
result = node1 . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_1 ' , ' TSV ' , ' { table_format } ' ) "
2022-03-22 16:39:58 +00:00
)
assert result . strip ( ) == " 3 \t 2 \t 1 "
result = node1 . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_2 ' , ' TSV ' , ' { table_format } ' ) "
2022-03-22 16:39:58 +00:00
)
assert result . strip ( ) == " 1 \t 3 \t 2 "
result = node1 . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_3 ' , ' TSV ' , ' { table_format } ' ) "
2022-03-22 16:39:58 +00:00
)
assert result . strip ( ) == " 1 \t 2 \t 3 "
2021-10-25 16:23:44 +00:00
2021-10-26 12:22:13 +00:00
file_name = " test2_ {_partition_id} "
2022-03-22 16:39:58 +00:00
node1 . query (
2024-07-24 19:55:03 +00:00
f " create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS( ' hdfs://hdfs1:9000/ { dir } / { file_name } ' , ' TSV ' ) partition by column3 "
2022-03-22 16:39:58 +00:00
)
2021-10-26 12:22:13 +00:00
node1 . query ( f " insert into p values { values } " )
2022-03-22 16:39:58 +00:00
result = node1 . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test2_1 ' , ' TSV ' , ' { table_format } ' ) "
2022-03-22 16:39:58 +00:00
)
assert result . strip ( ) == " 3 \t 2 \t 1 "
result = node1 . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test2_2 ' , ' TSV ' , ' { table_format } ' ) "
2022-03-22 16:39:58 +00:00
)
assert result . strip ( ) == " 1 \t 3 \t 2 "
result = node1 . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test2_3 ' , ' TSV ' , ' { table_format } ' ) "
2022-03-22 16:39:58 +00:00
)
assert result . strip ( ) == " 1 \t 2 \t 3 "
2024-07-24 19:55:03 +00:00
node1 . query ( f " drop table p " )
fs . delete ( " / {dir} " , recursive = True )
2021-10-26 12:22:13 +00:00
2021-10-25 16:23:44 +00:00
2022-08-05 16:20:15 +00:00
def test_seekable_formats ( started_cluster ) :
2021-10-31 19:53:24 +00:00
hdfs_api = started_cluster . hdfs_api
2022-03-22 16:39:58 +00:00
table_function = (
f " hdfs( ' hdfs://hdfs1:9000/parquet ' , ' Parquet ' , ' a Int32, b String ' ) "
)
node1 . query (
2024-07-24 19:55:03 +00:00
f " insert into table function { table_function } SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1 "
2022-03-22 16:39:58 +00:00
)
2021-10-31 19:53:24 +00:00
result = node1 . query ( f " SELECT count() FROM { table_function } " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2021-10-31 19:53:24 +00:00
table_function = f " hdfs( ' hdfs://hdfs1:9000/orc ' , ' ORC ' , ' a Int32, b String ' ) "
2022-03-22 16:39:58 +00:00
node1 . query (
2024-07-24 19:55:03 +00:00
f " insert into table function { table_function } SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1 "
2022-03-22 16:39:58 +00:00
)
2021-10-31 19:53:24 +00:00
result = node1 . query ( f " SELECT count() FROM { table_function } " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2021-10-31 19:53:24 +00:00
2021-12-17 15:34:13 +00:00
2022-08-05 16:20:15 +00:00
def test_read_table_with_default ( started_cluster ) :
2021-12-06 03:54:45 +00:00
hdfs_api = started_cluster . hdfs_api
data = " n \n 100 \n "
hdfs_api . write_data ( " /simple_table_function " , data )
assert hdfs_api . read_data ( " /simple_table_function " ) == data
output = " n \t m \n 100 \t 200 \n "
2022-03-22 16:39:58 +00:00
assert (
node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSVWithNames ' , ' n UInt32, m UInt32 DEFAULT n * 2 ' ) FORMAT TSVWithNames "
)
== output
)
2021-12-06 03:54:45 +00:00
2022-08-05 16:20:15 +00:00
def test_schema_inference ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
2024-07-24 19:55:03 +00:00
f " insert into table function hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' , ' a Int32, b String ' ) SELECT number, randomString(100) FROM numbers(5000000) SETTINGS hdfs_truncate_on_insert=1 "
2022-03-22 16:39:58 +00:00
)
2021-12-17 15:34:13 +00:00
2021-12-20 21:00:40 +00:00
result = node1 . query ( f " desc hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' ) " )
2021-12-17 15:34:13 +00:00
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
2022-03-22 16:39:58 +00:00
result = node1 . query (
f " select count(*) from hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' ) "
)
assert int ( result ) == 5000000
2021-12-17 15:34:13 +00:00
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table schema_inference engine=HDFS( ' hdfs://hdfs1:9000/native ' , ' Native ' ) "
)
2021-12-17 15:34:13 +00:00
result = node1 . query ( f " desc schema_inference " )
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
result = node1 . query ( f " select count(*) from schema_inference " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 5000000
2024-07-24 19:55:03 +00:00
node1 . query ( f " drop table schema_inference " )
2021-12-17 15:34:13 +00:00
2021-10-31 19:53:24 +00:00
2022-08-05 16:20:15 +00:00
def test_hdfsCluster ( started_cluster ) :
2021-12-03 05:25:14 +00:00
hdfs_api = started_cluster . hdfs_api
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
2022-03-22 16:39:58 +00:00
dir = " /test_hdfsCluster "
2021-12-03 05:25:14 +00:00
exists = fs . exists ( dir )
if exists :
fs . delete ( dir , recursive = True )
fs . mkdirs ( dir )
hdfs_api . write_data ( " /test_hdfsCluster/file1 " , " 1 \n " )
hdfs_api . write_data ( " /test_hdfsCluster/file2 " , " 2 \n " )
hdfs_api . write_data ( " /test_hdfsCluster/file3 " , " 3 \n " )
2022-03-22 16:39:58 +00:00
actual = node1 . query (
" select id, _file as file_name, _path as file_path from hdfs( ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) order by id "
)
2024-05-23 19:10:40 +00:00
expected = " 1 \t file1 \t test_hdfsCluster/file1 \n 2 \t file2 \t test_hdfsCluster/file2 \n 3 \t file3 \t test_hdfsCluster/file3 \n "
2021-12-03 05:25:14 +00:00
assert actual == expected
2022-03-22 16:39:58 +00:00
actual = node1 . query (
" select id, _file as file_name, _path as file_path from hdfsCluster( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) order by id "
)
2024-05-23 19:10:40 +00:00
expected = " 1 \t file1 \t test_hdfsCluster/file1 \n 2 \t file2 \t test_hdfsCluster/file2 \n 3 \t file3 \t test_hdfsCluster/file3 \n "
2021-12-03 05:25:14 +00:00
assert actual == expected
fs . delete ( dir , recursive = True )
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_hdfs_directory_not_exist ( started_cluster ) :
2022-03-22 16:39:58 +00:00
ddl = " create table HDFSStorageWithNotExistDir (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/data/not_eixst ' , ' TSV ' ) "
2022-01-18 05:55:41 +00:00
node1 . query ( ddl )
2024-03-28 14:00:49 +00:00
assert " " == node1 . query (
" select * from HDFSStorageWithNotExistDir settings hdfs_ignore_file_doesnt_exist=1 "
)
2024-07-24 19:55:03 +00:00
node1 . query ( " drop table HDFSStorageWithNotExistDir " )
2021-12-03 05:25:14 +00:00
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_overwrite ( started_cluster ) :
2021-12-29 18:03:15 +00:00
hdfs_api = started_cluster . hdfs_api
table_function = f " hdfs( ' hdfs://hdfs1:9000/data ' , ' Parquet ' , ' a Int32, b String ' ) "
2022-01-18 19:26:13 +00:00
node1 . query ( f " create table test_overwrite as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_overwrite select number, randomString(100) from numbers(5) "
)
node1 . query_and_get_error (
f " insert into test_overwrite select number, randomString(100) FROM numbers(10) "
)
node1 . query (
f " insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
2022-01-18 19:26:13 +00:00
result = node1 . query ( f " select count() from test_overwrite " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 10
2024-07-24 19:55:03 +00:00
node1 . query ( f " truncate table test_overwrite " )
node1 . query ( f " drop table test_overwrite " )
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_multiple_inserts ( started_cluster ) :
2024-07-24 19:55:03 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
id = uuid . uuid4 ( )
fs . mkdirs ( f " / { id } / " , permission = 777 )
2021-12-29 18:03:15 +00:00
2024-07-24 19:55:03 +00:00
table_function = f " hdfs( ' hdfs://hdfs1:9000/ { id } /data_multiple_inserts ' , ' Parquet ' , ' a Int32, b String ' ) "
2021-12-29 18:03:15 +00:00
node1 . query ( f " create table test_multiple_inserts as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(10) "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1 "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " select count() from test_multiple_inserts " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 60
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " drop table test_multiple_inserts " )
2024-07-24 19:55:03 +00:00
table_function = f " hdfs( ' hdfs://hdfs1:9000/ { id } /data_multiple_inserts.gz ' , ' Parquet ' , ' a Int32, b String ' ) "
2021-12-29 18:03:15 +00:00
node1 . query ( f " create table test_multiple_inserts as { table_function } " )
2022-03-22 16:39:58 +00:00
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(10) "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1 "
)
node1 . query (
f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
result = node1 . query ( f " select count() from test_multiple_inserts " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 60
2024-07-24 19:55:03 +00:00
node1 . query ( f " drop table test_multiple_inserts " )
2022-03-22 16:39:58 +00:00
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_format_detection ( started_cluster ) :
2022-03-22 16:39:58 +00:00
node1 . query (
f " create table arrow_table (x UInt64) engine=HDFS( ' hdfs://hdfs1:9000/data.arrow ' ) "
)
2022-01-14 11:00:50 +00:00
node1 . query ( f " insert into arrow_table select 1 " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/data.arrow ' ) " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 1
2024-07-24 19:55:03 +00:00
node1 . query ( f " truncate table arrow_table " )
node1 . query ( f " drop table arrow_table " )
2022-01-14 11:00:50 +00:00
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_schema_inference_with_globs ( started_cluster ) :
2024-08-06 18:26:52 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
dir = " /test_schema_inference_with_globs "
fs . mkdirs ( dir )
2022-03-22 16:39:58 +00:00
node1 . query (
2024-08-06 18:26:52 +00:00
f " insert into table function hdfs( ' hdfs://hdfs1:9000 { dir } /data1.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select NULL "
2022-03-22 16:39:58 +00:00
)
node1 . query (
2024-08-06 18:26:52 +00:00
f " insert into table function hdfs( ' hdfs://hdfs1:9000 { dir } /data2.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select 0 "
2022-03-22 16:39:58 +00:00
)
2023-09-25 22:15:41 +00:00
result = node1 . query (
2024-08-06 18:26:52 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000 { dir } /data*.jsoncompacteachrow ' ) settings input_format_json_infer_incomplete_types_as_strings=0 "
2023-09-25 22:15:41 +00:00
)
2022-07-21 16:54:42 +00:00
assert result . strip ( ) == " c1 \t Nullable(Int64) "
2022-02-09 16:14:14 +00:00
2022-03-22 16:39:58 +00:00
result = node1 . query (
2024-08-06 18:26:52 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000 { dir } /data*.jsoncompacteachrow ' ) settings input_format_json_infer_incomplete_types_as_strings=0 "
2022-03-22 16:39:58 +00:00
)
assert sorted ( result . split ( ) ) == [ " 0 " , " \\ N " ]
2022-02-09 16:14:14 +00:00
2022-04-13 16:59:04 +00:00
node1 . query (
2024-08-06 18:26:52 +00:00
f " insert into table function hdfs( ' hdfs://hdfs1:9000 { dir } /data3.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select NULL "
2022-04-13 16:59:04 +00:00
)
filename = " data { 1,3}.jsoncompacteachrow "
2022-06-29 11:10:39 +00:00
result = node1 . query_and_get_error (
2024-08-06 18:26:52 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000 { dir } / { filename } ' ) settings schema_inference_use_cache_for_hdfs=0, input_format_json_infer_incomplete_types_as_strings=0 "
2022-06-29 11:10:39 +00:00
)
2022-04-13 16:59:04 +00:00
assert " All attempts to extract table structure from files failed " in result
node1 . query (
2024-08-06 18:26:52 +00:00
f " insert into table function hdfs( ' hdfs://hdfs1:9000 { dir } /data0.jsoncompacteachrow ' , ' TSV ' , ' x String ' ) select ' [123;] ' "
2022-04-13 16:59:04 +00:00
)
2022-04-20 14:34:05 +00:00
2022-04-13 16:59:04 +00:00
result = node1 . query_and_get_error (
2024-08-06 18:26:52 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000 { dir } /data*.jsoncompacteachrow ' ) settings schema_inference_use_cache_for_hdfs=0, input_format_json_infer_incomplete_types_as_strings=0 "
2022-04-13 16:59:04 +00:00
)
2024-01-26 01:02:03 +00:00
assert " CANNOT_EXTRACT_TABLE_STRUCTURE " in result
2024-08-06 18:26:52 +00:00
fs . delete ( dir , recursive = True )
2022-04-13 16:59:04 +00:00
2022-02-09 16:14:14 +00:00
2022-08-05 16:20:15 +00:00
def test_insert_select_schema_inference ( started_cluster ) :
2024-07-24 19:55:03 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
2022-03-22 16:39:58 +00:00
node1 . query (
2024-08-07 21:57:19 +00:00
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) select toUInt64(1) as x "
2022-03-22 16:39:58 +00:00
)
2022-02-18 16:19:42 +00:00
result = node1 . query ( f " desc hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) " )
2022-03-22 16:39:58 +00:00
assert result . strip ( ) == " x \t UInt64 "
2022-02-18 16:19:42 +00:00
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test.native.zst ' ) " )
2022-03-22 16:39:58 +00:00
assert int ( result ) == 1
2024-07-24 19:55:21 +00:00
fs . delete ( " /test.native.zst " )
2022-02-18 16:19:42 +00:00
2022-04-07 10:11:49 +00:00
2022-08-05 16:20:15 +00:00
def test_cluster_join ( started_cluster ) :
2022-03-30 08:19:16 +00:00
result = node1 . query (
2022-03-31 01:28:07 +00:00
"""
2022-03-30 08:19:16 +00:00
SELECT l . id , r . id FROM hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) as l
JOIN hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) as r
ON l . id = r . id
2022-03-31 01:28:07 +00:00
"""
2022-03-30 08:19:16 +00:00
)
assert " AMBIGUOUS_COLUMN_NAME " not in result
2022-02-18 16:19:42 +00:00
2022-08-05 16:20:15 +00:00
def test_cluster_macro ( started_cluster ) :
2022-04-27 23:32:49 +00:00
with_macro = node1 . query (
"""
2024-06-13 20:36:57 +00:00
SELECT id FROM hdfsCluster ( ' {default_cluster_macro} ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' )
2022-04-27 23:32:49 +00:00
"""
)
no_macro = node1 . query (
"""
2024-06-13 20:36:57 +00:00
SELECT id FROM hdfsCluster ( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' )
2022-04-27 23:32:49 +00:00
"""
)
assert TSV ( with_macro ) == TSV ( no_macro )
2022-08-05 16:20:15 +00:00
def test_virtual_columns_2 ( started_cluster ) :
2022-03-24 16:10:04 +00:00
hdfs_api = started_cluster . hdfs_api
2024-08-06 18:26:52 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
2022-03-24 16:10:04 +00:00
2022-03-25 09:14:14 +00:00
table_function = (
2022-03-30 15:34:07 +00:00
f " hdfs( ' hdfs://hdfs1:9000/parquet_2 ' , ' Parquet ' , ' a Int32, b String ' ) "
2022-03-25 09:14:14 +00:00
)
2024-08-07 21:57:19 +00:00
node1 . query ( f " insert into table function { table_function } SELECT 1, ' kek ' " )
2022-03-24 16:10:04 +00:00
result = node1 . query ( f " SELECT _path FROM { table_function } " )
2024-05-23 19:10:40 +00:00
assert result . strip ( ) == " parquet_2 "
2022-03-31 09:13:38 +00:00
table_function = (
f " hdfs( ' hdfs://hdfs1:9000/parquet_3 ' , ' Parquet ' , ' a Int32, _path String ' ) "
)
2024-08-07 21:57:19 +00:00
node1 . query ( f " insert into table function { table_function } SELECT 1, ' kek ' " )
2022-03-31 09:13:38 +00:00
result = node1 . query ( f " SELECT _path FROM { table_function } " )
assert result . strip ( ) == " kek "
2024-08-06 18:26:52 +00:00
fs . delete ( " /parquet_2 " )
fs . delete ( " /parquet_3 " )
2022-03-24 16:10:04 +00:00
2023-08-22 11:59:59 +00:00
def check_profile_event_for_query ( node , file , profile_event , amount = 1 ) :
2022-06-27 12:43:24 +00:00
node . query ( " system flush logs " )
2023-08-22 11:59:59 +00:00
query_pattern = f " hdfs( ' hdfs://hdfs1:9000/ { file } ' " . replace ( " ' " , " \\ ' " )
assert (
int (
node . query (
f " select ProfileEvents[ ' { profile_event } ' ] from system.query_log where query like ' % { query_pattern } % ' and type = ' QueryFinish ' order by query_start_time_microseconds desc limit 1 "
)
2022-06-27 12:43:24 +00:00
)
2023-08-22 11:59:59 +00:00
== amount
2022-06-27 12:43:24 +00:00
)
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
def check_cache_misses ( node1 , file , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query ( node1 , file , " SchemaInferenceCacheMisses " , amount )
2022-08-05 16:20:15 +00:00
def check_cache_hits ( node1 , file , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query ( node1 , file , " SchemaInferenceCacheHits " , amount )
2022-08-05 16:20:15 +00:00
def check_cache_invalidations ( node1 , file , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query (
node1 , file , " SchemaInferenceCacheInvalidations " , amount
2022-08-11 10:55:18 +00:00
)
2022-08-05 16:20:15 +00:00
def check_cache_evictions ( node1 , file , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query ( node1 , file , " SchemaInferenceCacheEvictions " , amount )
def check_cache_num_rows_hits ( node1 , file , amount = 1 ) :
check_profile_event_for_query (
node1 , file , " SchemaInferenceCacheNumRowsHits " , amount
2022-08-11 10:55:18 +00:00
)
2022-08-05 16:20:15 +00:00
def check_cache ( node1 , expected_files ) :
sources = node1 . query ( " select source from system.schema_inference_cache " )
2022-08-11 10:55:18 +00:00
assert sorted ( map ( lambda x : x . strip ( ) . split ( " / " ) [ - 1 ] , sources . split ( ) ) ) == sorted (
expected_files
)
2022-08-05 16:20:15 +00:00
def run_describe_query ( node , file ) :
query = f " desc hdfs( ' hdfs://hdfs1:9000/ { file } ' ) "
node . query ( query )
2023-08-22 11:59:59 +00:00
def run_count_query ( node , file ) :
query = f " select count() from hdfs( ' hdfs://hdfs1:9000/ { file } ' , auto, ' x UInt64 ' ) "
return node . query ( query )
2022-06-21 13:02:48 +00:00
def test_schema_inference_cache ( started_cluster ) :
2022-08-16 10:39:35 +00:00
node1 . query ( " system drop schema cache " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-21 13:02:48 +00:00
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " ] )
check_cache_misses ( node1 , " test_cache0.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_hits ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-21 13:02:48 +00:00
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_invalidations ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache1.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
2022-08-05 16:20:15 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache1.jsonl " ] )
check_cache_misses ( node1 , " test_cache1.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache_hits ( node1 , " test_cache1.jsonl " )
2022-06-21 13:02:48 +00:00
node1 . query (
2022-08-05 16:20:15 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache2.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
2022-06-27 12:43:24 +00:00
)
2022-08-05 16:20:15 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache ( node1 , [ " test_cache1.jsonl " , " test_cache2.jsonl " ] )
check_cache_misses ( node1 , " test_cache2.jsonl " )
check_cache_evictions ( node1 , " test_cache2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache_hits ( node1 , " test_cache2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( node1 , " test_cache1.jsonl " )
check_cache_hits ( node1 , " test_cache1.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache1.jsonl " ] )
check_cache_misses ( node1 , " test_cache0.jsonl " )
check_cache_evictions ( node1 , " test_cache0.jsonl " )
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache ( node1 , [ " test_cache0.jsonl " , " test_cache2.jsonl " ] )
check_cache_misses ( node1 , " test_cache2.jsonl " )
check_cache_evictions ( node1 , " test_cache2.jsonl " )
run_describe_query ( node1 , " test_cache2.jsonl " )
check_cache_hits ( node1 , " test_cache2.jsonl " )
run_describe_query ( node1 , " test_cache0.jsonl " )
check_cache_hits ( node1 , " test_cache0.jsonl " )
2022-06-21 13:02:48 +00:00
2022-06-27 12:43:24 +00:00
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache3.jsonl ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-08-05 16:20:15 +00:00
files = " test_cache { 0,1,2,3}.jsonl "
run_describe_query ( node1 , files )
check_cache_hits ( node1 , files )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node1 . query ( f " system drop schema cache for hdfs " )
check_cache ( node1 , [ ] )
run_describe_query ( node1 , files )
check_cache_misses ( node1 , files , 4 )
node1 . query ( " system drop schema cache " )
check_cache ( node1 , [ ] )
run_describe_query ( node1 , files )
check_cache_misses ( node1 , files , 4 )
2022-06-21 13:02:48 +00:00
2023-08-22 11:59:59 +00:00
node1 . query ( " system drop schema cache " )
check_cache ( node1 , [ ] )
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.csv ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query ( node1 , " test_cache0.csv " )
assert int ( res ) == 100
check_cache ( node1 , [ " test_cache0.csv " ] )
check_cache_misses ( node1 , " test_cache0.csv " )
res = run_count_query ( node1 , " test_cache0.csv " )
assert int ( res ) == 100
check_cache_hits ( node1 , " test_cache0.csv " )
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache0.csv ' ) select * from numbers(200) settings hdfs_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query ( node1 , " test_cache0.csv " )
assert int ( res ) == 200
check_cache_invalidations ( node1 , " test_cache0.csv " )
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache1.csv ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query ( node1 , " test_cache1.csv " )
assert int ( res ) == 100
check_cache ( node1 , [ " test_cache0.csv " , " test_cache1.csv " ] )
check_cache_misses ( node1 , " test_cache1.csv " )
res = run_count_query ( node1 , " test_cache1.csv " )
assert int ( res ) == 100
check_cache_hits ( node1 , " test_cache1.csv " )
res = run_count_query ( node1 , " test_cache { 0,1}.csv " )
assert int ( res ) == 300
check_cache_hits ( node1 , " test_cache { 0,1}.csv " , 2 )
node1 . query ( f " system drop schema cache for hdfs " )
check_cache ( node1 , [ ] )
res = run_count_query ( node1 , " test_cache { 0,1}.csv " )
assert int ( res ) == 300
check_cache_misses ( node1 , " test_cache { 0,1}.csv " , 2 )
node1 . query ( f " system drop schema cache for hdfs " )
check_cache ( node1 , [ ] )
node1 . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_cache.parquet ' ) select * from numbers(100) settings hdfs_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = node1 . query (
f " select count() from hdfs( ' hdfs://hdfs1:9000/test_cache.parquet ' ) "
)
assert int ( res ) == 100
check_cache_misses ( node1 , " test_cache.parquet " )
check_cache_hits ( node1 , " test_cache.parquet " )
check_cache_num_rows_hits ( node1 , " test_cache.parquet " )
2022-06-21 13:02:48 +00:00
2022-11-16 11:47:57 +00:00
def test_hdfsCluster_skip_unavailable_shards ( started_cluster ) :
2023-02-23 09:05:51 +00:00
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
2022-11-16 15:43:23 +00:00
hdfs_api = started_cluster . hdfs_api
2022-11-15 05:25:15 +00:00
node = started_cluster . instances [ " node1 " ]
2022-11-16 11:47:57 +00:00
data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2022-11-16 15:43:23 +00:00
hdfs_api . write_data ( " /skip_unavailable_shards " , data )
2022-11-15 05:25:15 +00:00
2022-11-16 11:47:57 +00:00
assert (
node1 . query (
2022-11-16 22:41:43 +00:00
" select * from hdfsCluster( ' cluster_non_existent_port ' , ' hdfs://hdfs1:9000/skip_unavailable_shards ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) settings skip_unavailable_shards = 1 "
2022-11-16 11:47:57 +00:00
)
== data
)
2022-11-15 05:25:15 +00:00
2023-02-23 09:05:51 +00:00
def test_hdfsCluster_unset_skip_unavailable_shards ( started_cluster ) :
2022-11-16 15:43:23 +00:00
hdfs_api = started_cluster . hdfs_api
2022-11-15 05:25:15 +00:00
node = started_cluster . instances [ " node1 " ]
2022-11-16 15:43:23 +00:00
data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
hdfs_api . write_data ( " /unskip_unavailable_shards " , data )
2022-11-15 05:25:15 +00:00
2023-02-23 09:05:51 +00:00
assert (
2023-02-23 09:18:56 +00:00
node1 . query (
2024-05-22 23:05:27 +00:00
" select * from hdfsCluster( ' cluster_non_existent_port ' , ' hdfs://hdfs1:9000/unskip_unavailable_shards ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) "
2023-02-23 09:18:56 +00:00
)
== data
2023-02-23 09:05:51 +00:00
)
2022-11-15 05:25:15 +00:00
2023-05-30 19:32:24 +00:00
def test_skip_empty_files ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' , TSVRaw) select * from numbers(0) settings hdfs_truncate_on_insert=1 "
)
node . query (
f " insert into function hdfs( ' hdfs://hdfs1:9000/skip_empty_files2.parquet ' ) select * from numbers(1) settings hdfs_truncate_on_insert=1 "
)
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' ) settings hdfs_skip_empty_files=0 "
)
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' , auto, ' number UINt64 ' ) settings hdfs_skip_empty_files=0 "
)
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' ) settings hdfs_skip_empty_files=1 "
)
res = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files1.parquet ' , auto, ' number UInt64 ' ) settings hdfs_skip_empty_files=1 "
)
assert len ( res ) == 0
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files*.parquet ' ) settings hdfs_skip_empty_files=0 "
)
node . query_and_get_error (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files*.parquet ' , auto, ' number UInt64 ' ) settings hdfs_skip_empty_files=0 "
)
res = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files*.parquet ' ) settings hdfs_skip_empty_files=1 "
)
assert int ( res ) == 0
res = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/skip_empty_files*.parquet ' , auto, ' number UInt64 ' ) settings hdfs_skip_empty_files=1 "
)
assert int ( res ) == 0
2023-07-04 16:50:31 +00:00
def test_read_subcolumns ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
2024-07-24 19:55:03 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_subcolumns.tsv ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) select ((1, 2), 3) settings hdfs_truncate_on_insert=1 "
2023-07-04 16:50:31 +00:00
)
node . query (
2024-07-24 19:55:03 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_subcolumns.jsonl ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) select ((1, 2), 3) settings hdfs_truncate_on_insert=1 "
2023-07-04 16:50:31 +00:00
)
res = node . query (
f " select a.b.d, _path, a.b, _file, a.e from hdfs( ' hdfs://hdfs1:9000/test_subcolumns.tsv ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
2024-05-23 19:10:40 +00:00
assert res == " 2 \t test_subcolumns.tsv \t (1,2) \t test_subcolumns.tsv \t 3 \n "
2023-07-04 16:50:31 +00:00
res = node . query (
f " select a.b.d, _path, a.b, _file, a.e from hdfs( ' hdfs://hdfs1:9000/test_subcolumns.jsonl ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
2024-05-23 19:10:40 +00:00
assert res == " 2 \t test_subcolumns.jsonl \t (1,2) \t test_subcolumns.jsonl \t 3 \n "
2023-07-04 16:50:31 +00:00
res = node . query (
2024-06-10 21:15:22 +00:00
f " select x.b.d, _path, x.b, _file, x.e from hdfs( ' hdfs://hdfs1:9000/test_subcolumns.jsonl ' , auto, ' x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
2023-07-04 16:50:31 +00:00
)
2024-06-10 21:15:22 +00:00
assert res == " 0 \t test_subcolumns.jsonl \t (0,0) \t test_subcolumns.jsonl \t 0 \n "
2023-07-04 16:50:31 +00:00
res = node . query (
f " select x.b.d, _path, x.b, _file, x.e from hdfs( ' hdfs://hdfs1:9000/test_subcolumns.jsonl ' , auto, ' x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42) ' ) "
)
2024-05-23 19:10:40 +00:00
assert res == " 42 \t test_subcolumns.jsonl \t (42,42) \t test_subcolumns.jsonl \t 42 \n "
2023-07-04 16:50:31 +00:00
2024-06-10 22:43:32 +00:00
def test_read_subcolumn_time ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
2024-07-24 19:55:03 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/test_subcolumn_time.tsv ' , auto, ' a UInt32 ' ) select (42) settings hdfs_truncate_on_insert=1 "
2024-06-10 22:43:32 +00:00
)
res = node . query (
2024-06-11 12:50:16 +00:00
f " select a, dateDiff( ' minute ' , _time, now()) < 59 from hdfs( ' hdfs://hdfs1:9000/test_subcolumn_time.tsv ' , auto, ' a UInt32 ' ) "
2024-06-10 22:43:32 +00:00
)
2024-06-11 12:50:16 +00:00
assert res == " 42 \t 1 \n "
2024-06-10 22:43:32 +00:00
2023-10-20 20:46:41 +00:00
def test_union_schema_inference_mode ( started_cluster ) :
2024-07-24 19:55:03 +00:00
id = uuid . uuid4 ( )
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
dir = f " union_ { id } "
fs . mkdirs ( f " / { dir } / " , permission = 777 )
2023-10-20 20:46:41 +00:00
node = started_cluster . instances [ " node1 " ]
node . query (
2024-07-24 19:55:03 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/ { dir } /test_union_schema_inference1.jsonl ' ) select 1 as a "
2023-10-20 20:46:41 +00:00
)
node . query (
2024-07-24 19:55:03 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/ { dir } /test_union_schema_inference2.jsonl ' ) select 2 as b "
2023-10-20 20:46:41 +00:00
)
node . query ( " system drop schema cache for hdfs " )
result = node . query (
2024-07-24 19:55:03 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/ { dir } /test_union_schema_inference*.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
assert result == " a \t Nullable(Int64) \n b \t Nullable(Int64) \n "
result = node . query (
2024-07-24 19:55:03 +00:00
f " select schema_inference_mode, splitByChar( ' / ' , source)[-1] as file, schema from system.schema_inference_cache where source like ' %test_union_schema_inference% ' order by file format TSV "
2023-10-20 20:46:41 +00:00
)
assert (
result == " UNION \t test_union_schema_inference1.jsonl \t a Nullable(Int64) \n "
" UNION \t test_union_schema_inference2.jsonl \t b Nullable(Int64) \n "
)
result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_union_schema_inference*.jsonl ' ) order by tuple(*) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
assert result == " 1 \t \\ N \n " " \\ N \t 2 \n "
node . query ( f " system drop schema cache for hdfs " )
result = node . query (
2024-07-24 19:55:03 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/ { dir } /test_union_schema_inference2.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
assert result == " b \t Nullable(Int64) \n "
result = node . query (
2024-07-24 19:55:03 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/ { dir } /test_union_schema_inference*.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
assert result == " a \t Nullable(Int64) \n " " b \t Nullable(Int64) \n "
node . query (
2024-07-24 19:55:03 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/ { dir } /test_union_schema_inference3.jsonl ' , TSV) select ' Error ' "
2023-10-20 20:46:41 +00:00
)
error = node . query_and_get_error (
2024-07-24 19:55:03 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/ { dir } /test_union_schema_inference*.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
2024-01-24 17:55:31 +00:00
assert " CANNOT_EXTRACT_TABLE_STRUCTURE " in error
2023-10-20 20:46:41 +00:00
2024-01-22 22:55:50 +00:00
def test_format_detection ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
2024-07-24 19:55:03 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
id = uuid . uuid4 ( )
dir = f " { id } "
fs . mkdirs ( f " / { dir } / " , permission = 777 )
2024-01-22 22:55:50 +00:00
node . query (
2024-07-24 19:55:03 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection0 ' , JSONEachRow) select number as x, ' str_ ' || toString(number) as y from numbers(0) "
2024-01-22 22:55:50 +00:00
)
node . query (
2024-07-24 19:55:03 +00:00
f " insert into function hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection1 ' , JSONEachRow) select number as x, ' str_ ' || toString(number) as y from numbers(10) "
2024-01-22 22:55:50 +00:00
)
expected_desc_result = node . query (
2024-07-24 19:55:03 +00:00
f " desc hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection1 ' , JSONEachRow) "
2024-01-22 22:55:50 +00:00
)
2024-07-24 19:55:21 +00:00
desc_result = node . query (
f " desc hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection1 ' ) "
)
2024-01-22 22:55:50 +00:00
assert expected_desc_result == desc_result
expected_result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection1 ' , JSONEachRow, ' x UInt64, y String ' ) order by x, y "
2024-01-22 22:55:50 +00:00
)
result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection1 ' ) order by x, y "
2024-01-22 22:55:50 +00:00
)
assert expected_result == result
result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection1 ' , auto, ' x UInt64, y String ' ) order by x, y "
2024-01-22 22:55:50 +00:00
)
assert expected_result == result
result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection {{ 0,1 }} ' ) order by x, y "
2024-01-22 22:55:50 +00:00
)
assert expected_result == result
node . query ( " system drop schema cache for hdfs " )
result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfs( ' hdfs://hdfs1:9000/ { dir } /test_format_detection {{ 0,1 }} ' ) order by x, y "
2024-01-22 22:55:50 +00:00
)
assert expected_result == result
result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfsCluster(test_cluster_two_shards, ' hdfs://hdfs1:9000/ { dir } /test_format_detection {{ 0,1 }} ' ) order by x, y "
2024-01-22 22:55:50 +00:00
)
assert expected_result == result
result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfsCluster(test_cluster_two_shards, ' hdfs://hdfs1:9000/ { dir } /test_format_detection {{ 0,1 }} ' , auto, auto) order by x, y "
2024-01-22 22:55:50 +00:00
)
assert expected_result == result
result = node . query (
2024-07-24 19:55:03 +00:00
f " select * from hdfsCluster(test_cluster_two_shards, ' hdfs://hdfs1:9000/ { dir } /test_format_detection {{ 0,1 }} ' , auto, ' x UInt64, y String ' ) order by x, y "
2024-01-22 22:55:50 +00:00
)
assert expected_result == result
2024-04-08 20:37:06 +00:00
def test_write_to_globbed_partitioned_path ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
error = node . query_and_get_error (
2024-08-07 21:57:19 +00:00
" insert into function hdfs( ' hdfs://hdfs1:9000/test_data_*_ {_partition_id} .csv ' ) partition by 42 select 42 "
2024-04-08 20:37:06 +00:00
)
assert " DATABASE_ACCESS_DENIED " in error
2024-05-10 12:28:55 +00:00
2024-04-08 20:18:47 +00:00
def test_respect_object_existence_on_partitioned_write ( started_cluster ) :
node = started_cluster . instances [ " node1 " ]
node . query (
" insert into function hdfs( ' hdfs://hdfs1:9000/test_partitioned_write42.csv ' , CSV) select 42 settings hdfs_truncate_on_insert=1 "
)
result = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_partitioned_write42.csv ' , CSV) "
)
assert int ( result ) == 42
error = node . query_and_get_error (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test_partitioned_write {{ _partition_id }} .csv ' , CSV) partition by 42 select 42 settings hdfs_truncate_on_insert=0 "
)
assert " BAD_ARGUMENTS " in error
node . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test_partitioned_write {{ _partition_id }} .csv ' , CSV) partition by 42 select 43 settings hdfs_truncate_on_insert=1 "
)
result = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_partitioned_write42.csv ' , CSV) "
)
assert int ( result ) == 43
node . query (
f " insert into table function hdfs( ' hdfs://hdfs1:9000/test_partitioned_write {{ _partition_id }} .csv ' , CSV) partition by 42 select 44 settings hdfs_truncate_on_insert=0, hdfs_create_new_file_on_insert=1 "
)
result = node . query (
f " select * from hdfs( ' hdfs://hdfs1:9000/test_partitioned_write42.1.csv ' , CSV) "
)
assert int ( result ) == 44
2024-07-05 12:44:31 +00:00
def test_hive_partitioning_with_one_parameter ( started_cluster ) :
hdfs_api = started_cluster . hdfs_api
hdfs_api . write_data (
2024-08-22 12:53:53 +00:00
f " /column0=Elizabeth/file_1 " , f " column0,column1 \n Elizabeth,Gordon \n "
2024-07-05 12:44:31 +00:00
)
assert (
2024-08-22 12:53:53 +00:00
hdfs_api . read_data ( f " /column0=Elizabeth/file_1 " )
== f " column0,column1 \n Elizabeth,Gordon \n "
2024-07-05 12:44:31 +00:00
)
r = node1 . query (
2024-08-22 11:20:04 +00:00
" SELECT column0 FROM hdfs( ' hdfs://hdfs1:9000/column0=Elizabeth/file_1 ' , ' CSVWithNames ' ) " ,
2024-07-05 17:36:59 +00:00
settings = { " use_hive_partitioning " : 1 } ,
2024-07-05 12:44:31 +00:00
)
assert r == f " Elizabeth \n "
def test_hive_partitioning_without_setting ( started_cluster ) :
hdfs_api = started_cluster . hdfs_api
hdfs_api . write_data (
f " /column0=Elizabeth/column1=Gordon/parquet_2 " , f " Elizabeth \t Gordon \n "
)
assert (
hdfs_api . read_data ( f " /column0=Elizabeth/column1=Gordon/parquet_2 " )
== f " Elizabeth \t Gordon \n "
)
pattern = re . compile (
2024-08-18 20:17:16 +00:00
r " DB::Exception: Unknown expression identifier `.*` in scope.* " , re . DOTALL
2024-07-05 12:44:31 +00:00
)
with pytest . raises ( QueryRuntimeException , match = pattern ) :
node1 . query (
2024-08-21 17:43:45 +00:00
f " SELECT column1 FROM hdfs( ' hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2 ' , ' TSV ' ); " ,
2024-07-05 17:36:59 +00:00
settings = { " use_hive_partitioning " : 0 } ,
2024-07-05 12:44:31 +00:00
)
2024-07-05 13:28:47 +00:00
2022-03-22 16:39:58 +00:00
if __name__ == " __main__ " :
2020-09-09 12:13:20 +00:00
cluster . start ( )
2020-10-30 19:40:16 +00:00
input ( " Cluster created, press any key to destroy... " )
2020-09-09 12:13:20 +00:00
cluster . shutdown ( )