2018-12-05 13:24:45 +00:00
import os
2020-09-16 04:26:10 +00:00
import pytest
2018-12-05 13:24:45 +00:00
from helpers . cluster import ClickHouseCluster
2021-11-10 14:21:25 +00:00
from pyhdfs import HdfsClient
2018-12-05 13:24:45 +00:00
cluster = ClickHouseCluster ( __file__ )
2021-06-29 13:01:15 +00:00
node1 = cluster . add_instance ( ' node1 ' , with_hdfs = True )
2018-12-05 13:24:45 +00:00
2020-09-16 04:26:10 +00:00
2018-12-05 13:24:45 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
2021-12-20 14:39:15 +00:00
def test_read_write_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-07-12 08:32:20 +00:00
node1 . query ( " drop table if exists SimpleHDFSStorage SYNC " )
2020-09-16 04:26:10 +00:00
node1 . query (
" create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/simple_storage ' , ' TSV ' ) " )
2019-09-05 14:42:17 +00:00
node1 . query ( " insert into SimpleHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_storage " ) == " 1 \t Mark \t 72.53 \n "
2018-12-05 13:24:45 +00:00
assert node1 . query ( " select * from SimpleHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-16 04:26:10 +00:00
2021-12-20 14:39:15 +00:00
def test_read_write_storage_with_globs ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2020-09-16 04:26:10 +00:00
node1 . query (
" create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage { 1..5} ' , ' TSV ' ) " )
node1 . query (
" create table HDFSStorageWithEnum (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage { 1,2,3,4,5} ' , ' TSV ' ) " )
node1 . query (
" create table HDFSStorageWithQuestionMark (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage? ' , ' TSV ' ) " )
node1 . query (
" create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage* ' , ' TSV ' ) " )
2019-09-05 14:42:17 +00:00
2019-09-20 11:26:00 +00:00
for i in [ " 1 " , " 2 " , " 3 " ] :
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /storage " + i , i + " \t Mark \t 72.53 \n " )
assert hdfs_api . read_data ( " /storage " + i ) == i + " \t Mark \t 72.53 \n "
2019-09-20 11:26:00 +00:00
assert node1 . query ( " select count(*) from HDFSStorageWithRange " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithEnum " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithQuestionMark " ) == " 3 \n "
assert node1 . query ( " select count(*) from HDFSStorageWithAsterisk " ) == " 3 \n "
try :
node1 . query ( " insert into HDFSStorageWithEnum values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
try :
node1 . query ( " insert into HDFSStorageWithQuestionMark values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
try :
node1 . query ( " insert into HDFSStorageWithAsterisk values (1, ' NEW ' , 4.2) " )
assert False , " Exception have to be thrown "
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " in readonly mode " in str ( ex )
2019-09-05 14:42:17 +00:00
2020-09-16 04:26:10 +00:00
2021-12-20 14:39:15 +00:00
def test_read_write_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-02-19 12:58:11 +00:00
2018-12-05 13:24:45 +00:00
data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /simple_table_function " , data )
2018-12-05 13:24:45 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_table_function " ) == data
2018-12-05 13:24:45 +00:00
2020-09-16 04:26:10 +00:00
assert node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) " ) == data
2019-01-17 14:10:30 +00:00
2021-12-20 14:39:15 +00:00
def test_write_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2019-01-17 14:10:30 +00:00
2020-09-16 04:26:10 +00:00
node1 . query (
" create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/other_storage ' , ' TSV ' ) " )
2019-01-17 14:10:30 +00:00
node1 . query ( " insert into OtherHDFSStorage values (10, ' tomas ' , 55.55), (11, ' jack ' , 32.54) " )
result = " 10 \t tomas \t 55.55 \n 11 \t jack \t 32.54 \n "
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /other_storage " ) == result
2019-01-17 14:10:30 +00:00
assert node1 . query ( " select * from OtherHDFSStorage order by id " ) == result
2019-01-19 20:17:19 +00:00
2020-09-16 04:26:10 +00:00
2021-12-20 14:39:15 +00:00
def test_bad_hdfs_uri ( started_cluster ) :
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
" create table BadStorage1 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hads:hgsdfs100500:9000/other_storage ' , ' TSV ' ) " )
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2021-11-08 11:12:06 +00:00
assert " Bad hdfs url " in str ( ex )
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
" create table BadStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs100500:9000/other_storage ' , ' TSV ' ) " )
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " Unable to create builder to connect to HDFS " in str ( ex )
2019-01-19 20:17:19 +00:00
try :
2020-09-16 04:26:10 +00:00
node1 . query (
" create table BadStorage3 (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/<> ' , ' TSV ' ) " )
2019-01-19 20:17:19 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ex )
2019-09-20 11:26:00 +00:00
assert " Unable to open HDFS file " in str ( ex )
2019-08-01 15:46:54 +00:00
2020-09-28 17:20:04 +00:00
@pytest.mark.timeout ( 800 )
2021-12-20 14:39:15 +00:00
def test_globs_in_read_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-02-19 12:58:11 +00:00
2019-08-01 15:46:54 +00:00
some_data = " 1 \t Serialize \t 555.222 \n 2 \t Data \t 777.333 \n "
2019-08-09 17:25:29 +00:00
globs_dir = " /dir_for_test_with_globs/ "
2020-09-16 04:26:10 +00:00
files = [ " dir1/dir_dir/file1 " , " dir2/file2 " , " simple_table_function " , " dir/file " , " some_dir/dir1/file " ,
" some_dir/dir2/file " , " some_dir/file " , " table1_function " , " table2_function " , " table3_function " ]
2019-08-09 17:25:29 +00:00
for filename in files :
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( globs_dir + filename , some_data )
2019-08-09 17:25:29 +00:00
2020-01-15 07:52:45 +00:00
test_requests = [ ( " dir { 1..5}/dir_dir/file1 " , 1 , 1 ) ,
( " *_table_functio? " , 1 , 1 ) ,
( " dir/fil? " , 1 , 1 ) ,
( " table { 3..8}_function " , 1 , 1 ) ,
( " table { 2..8}_function " , 2 , 2 ) ,
( " dir/* " , 1 , 1 ) ,
( " dir/*?*?*?*?* " , 1 , 1 ) ,
( " dir/*?*?*?*?*?* " , 0 , 0 ) ,
( " some_dir/*/file " , 2 , 1 ) ,
( " some_dir/dir?/* " , 2 , 1 ) ,
( " */*/* " , 3 , 2 ) ,
( " ? " , 0 , 0 ) ]
for pattern , paths_amount , files_amount in test_requests :
inside_table_func = " ' hdfs://hdfs1:9000 " + globs_dir + pattern + " ' , ' TSV ' , ' id UInt64, text String, number Float64 ' "
2020-09-28 17:20:04 +00:00
print ( " inside_table_func " , inside_table_func )
2020-01-15 07:52:45 +00:00
assert node1 . query ( " select * from hdfs( " + inside_table_func + " ) " ) == paths_amount * some_data
2020-09-16 04:26:10 +00:00
assert node1 . query ( " select count(distinct _path) from hdfs( " + inside_table_func + " ) " ) . rstrip ( ) == str (
paths_amount )
assert node1 . query ( " select count(distinct _file) from hdfs( " + inside_table_func + " ) " ) . rstrip ( ) == str (
files_amount )
2019-11-19 12:46:07 +00:00
2021-12-20 14:39:15 +00:00
def test_read_write_gzip_table ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-02-19 12:58:11 +00:00
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2020-09-16 04:26:10 +00:00
assert node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' ) " ) == data
2019-11-19 12:46:07 +00:00
2021-12-20 14:39:15 +00:00
def test_read_write_gzip_table_with_parameter_gzip ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-02-19 12:58:11 +00:00
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function " ) == data
2019-11-19 12:46:07 +00:00
2020-09-16 04:26:10 +00:00
assert node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' gzip ' ) " ) == data
2019-11-19 12:46:07 +00:00
2021-12-20 14:39:15 +00:00
def test_read_write_table_with_parameter_none ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-02-19 12:58:11 +00:00
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2020-09-16 04:26:10 +00:00
assert node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' none ' ) " ) == data
2019-11-19 12:46:07 +00:00
2021-12-20 14:39:15 +00:00
def test_read_write_gzip_table_with_parameter_auto_gz ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-02-19 12:58:11 +00:00
2019-11-19 12:46:07 +00:00
data = " 1 \t Hello Jessica \t 555.222 \n 2 \t I rolled a joint \t 777.333 \n "
2021-02-19 12:58:11 +00:00
hdfs_api . write_gzip_data ( " /simple_table_function.gz " , data )
2019-11-19 12:46:07 +00:00
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /simple_table_function.gz " ) == data
2019-11-19 12:46:07 +00:00
2020-09-16 04:26:10 +00:00
assert node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function.gz ' , ' TSV ' , ' id UInt64, text String, number Float64 ' , ' auto ' ) " ) == data
2019-11-19 12:46:07 +00:00
2021-12-20 14:39:15 +00:00
def test_write_gz_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-02-19 12:58:11 +00:00
2020-09-16 04:26:10 +00:00
node1 . query (
" create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/storage.gz ' , ' TSV ' ) " )
2019-11-19 12:46:07 +00:00
node1 . query ( " insert into GZHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /storage.gz " ) == " 1 \t Mark \t 72.53 \n "
2019-11-19 12:46:07 +00:00
assert node1 . query ( " select * from GZHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-16 04:26:10 +00:00
2021-12-20 14:39:15 +00:00
def test_write_gzip_storage ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-02-19 12:58:11 +00:00
2020-09-16 04:26:10 +00:00
node1 . query (
" create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/gzip_storage ' , ' TSV ' , ' gzip ' ) " )
2019-11-19 12:46:07 +00:00
node1 . query ( " insert into GZIPHDFSStorage values (1, ' Mark ' , 72.53) " )
2021-02-19 12:58:11 +00:00
assert hdfs_api . read_gzip_data ( " /gzip_storage " ) == " 1 \t Mark \t 72.53 \n "
2019-11-19 12:46:07 +00:00
assert node1 . query ( " select * from GZIPHDFSStorage " ) == " 1 \t Mark \t 72.53 \n "
2020-09-09 12:13:20 +00:00
2021-04-20 08:38:14 +00:00
2021-12-20 14:39:15 +00:00
def test_virtual_columns ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-04-20 08:38:14 +00:00
node1 . query ( " create table virtual_cols (id UInt32) ENGINE = HDFS( ' hdfs://hdfs1:9000/file* ' , ' TSV ' ) " )
2021-04-27 17:20:13 +00:00
hdfs_api . write_data ( " /file1 " , " 1 \n " )
hdfs_api . write_data ( " /file2 " , " 2 \n " )
hdfs_api . write_data ( " /file3 " , " 3 \n " )
2021-04-20 08:38:14 +00:00
expected = " 1 \t file1 \t hdfs://hdfs1:9000//file1 \n 2 \t file2 \t hdfs://hdfs1:9000//file2 \n 3 \t file3 \t hdfs://hdfs1:9000//file3 \n "
assert node1 . query ( " select id, _file as file_name, _path as file_path from virtual_cols order by id " ) == expected
2021-06-21 13:50:09 +00:00
2021-12-20 14:39:15 +00:00
def test_read_files_with_spaces ( started_cluster ) :
2021-06-09 09:23:02 +00:00
hdfs_api = started_cluster . hdfs_api
2021-11-10 14:20:25 +00:00
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
dir = ' /test_spaces '
exists = fs . exists ( dir )
if exists :
fs . delete ( dir , recursive = True )
fs . mkdirs ( dir )
hdfs_api . write_data ( f " { dir } /test test test 1.txt " , " 1 \n " )
hdfs_api . write_data ( f " { dir } /test test test 2.txt " , " 2 \n " )
hdfs_api . write_data ( f " { dir } /test test test 3.txt " , " 3 \n " )
node1 . query ( f " create table test (id UInt32) ENGINE = HDFS( ' hdfs://hdfs1:9000/ { dir } /test* ' , ' TSV ' ) " )
2021-04-19 20:39:22 +00:00
assert node1 . query ( " select * from test order by id " ) == " 1 \n 2 \n 3 \n "
2021-11-10 14:20:25 +00:00
fs . delete ( dir , recursive = True )
2021-04-19 20:39:22 +00:00
2021-04-20 08:38:14 +00:00
2021-12-20 14:39:15 +00:00
def test_truncate_table ( started_cluster ) :
2021-06-21 13:50:09 +00:00
hdfs_api = started_cluster . hdfs_api
node1 . query (
" create table test_truncate (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/tr ' , ' TSV ' ) " )
node1 . query ( " insert into test_truncate values (1, ' Mark ' , 72.53) " )
assert hdfs_api . read_data ( " /tr " ) == " 1 \t Mark \t 72.53 \n "
assert node1 . query ( " select * from test_truncate " ) == " 1 \t Mark \t 72.53 \n "
node1 . query ( " truncate table test_truncate " )
assert node1 . query ( " select * from test_truncate " ) == " "
node1 . query ( " drop table test_truncate " )
2021-12-20 14:39:15 +00:00
def test_partition_by ( started_cluster ) :
2021-10-25 16:23:44 +00:00
hdfs_api = started_cluster . hdfs_api
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
file_name = " test_ {_partition_id} "
partition_by = " column3 "
values = " (1, 2, 3), (3, 2, 1), (1, 3, 2) "
table_function = f " hdfs( ' hdfs://hdfs1:9000/ { file_name } ' , ' TSV ' , ' { table_format } ' ) "
node1 . query ( f " insert into table function { table_function } PARTITION BY { partition_by } values { values } " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test_1 ' , ' TSV ' , ' { table_format } ' ) " )
assert ( result . strip ( ) == " 3 \t 2 \t 1 " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test_2 ' , ' TSV ' , ' { table_format } ' ) " )
assert ( result . strip ( ) == " 1 \t 3 \t 2 " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test_3 ' , ' TSV ' , ' { table_format } ' ) " )
assert ( result . strip ( ) == " 1 \t 2 \t 3 " )
2021-10-26 12:22:13 +00:00
file_name = " test2_ {_partition_id} "
node1 . query ( f " create table p(column1 UInt32, column2 UInt32, column3 UInt32) engine = HDFS( ' hdfs://hdfs1:9000/ { file_name } ' , ' TSV ' ) partition by column3 " )
node1 . query ( f " insert into p values { values } " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test2_1 ' , ' TSV ' , ' { table_format } ' ) " )
assert ( result . strip ( ) == " 3 \t 2 \t 1 " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test2_2 ' , ' TSV ' , ' { table_format } ' ) " )
assert ( result . strip ( ) == " 1 \t 3 \t 2 " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/test2_3 ' , ' TSV ' , ' { table_format } ' ) " )
assert ( result . strip ( ) == " 1 \t 2 \t 3 " )
2021-10-25 16:23:44 +00:00
2021-12-20 14:39:15 +00:00
def test_seekable_formats ( started_cluster ) :
2021-10-31 19:53:24 +00:00
hdfs_api = started_cluster . hdfs_api
table_function = f " hdfs( ' hdfs://hdfs1:9000/parquet ' , ' Parquet ' , ' a Int32, b String ' ) "
2021-11-24 18:53:53 +00:00
node1 . query ( f " insert into table function { table_function } SELECT number, randomString(100) FROM numbers(5000000) " )
2021-10-31 19:53:24 +00:00
result = node1 . query ( f " SELECT count() FROM { table_function } " )
assert ( int ( result ) == 5000000 )
table_function = f " hdfs( ' hdfs://hdfs1:9000/orc ' , ' ORC ' , ' a Int32, b String ' ) "
2021-11-24 18:53:53 +00:00
node1 . query ( f " insert into table function { table_function } SELECT number, randomString(100) FROM numbers(5000000) " )
2021-10-31 19:53:24 +00:00
result = node1 . query ( f " SELECT count() FROM { table_function } " )
assert ( int ( result ) == 5000000 )
2021-12-17 15:34:13 +00:00
2021-12-20 14:39:15 +00:00
def test_read_table_with_default ( started_cluster ) :
2021-12-06 03:54:45 +00:00
hdfs_api = started_cluster . hdfs_api
data = " n \n 100 \n "
hdfs_api . write_data ( " /simple_table_function " , data )
assert hdfs_api . read_data ( " /simple_table_function " ) == data
output = " n \t m \n 100 \t 200 \n "
assert node1 . query (
" select * from hdfs( ' hdfs://hdfs1:9000/simple_table_function ' , ' TSVWithNames ' , ' n UInt32, m UInt32 DEFAULT n * 2 ' ) FORMAT TSVWithNames " ) == output
2021-12-17 15:34:13 +00:00
def test_schema_inference ( started_cluster ) :
2021-12-20 21:00:40 +00:00
node1 . query ( f " insert into table function hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' , ' a Int32, b String ' ) SELECT number, randomString(100) FROM numbers(5000000) " )
2021-12-17 15:34:13 +00:00
2021-12-20 21:00:40 +00:00
result = node1 . query ( f " desc hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' ) " )
2021-12-17 15:34:13 +00:00
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
2021-12-20 21:00:40 +00:00
result = node1 . query ( f " select count(*) from hdfs( ' hdfs://hdfs1:9000/native ' , ' Native ' ) " )
2021-12-17 15:34:13 +00:00
assert ( int ( result ) == 5000000 )
2021-12-20 21:00:40 +00:00
node1 . query ( f " create table schema_inference engine=HDFS( ' hdfs://hdfs1:9000/native ' , ' Native ' ) " )
2021-12-17 15:34:13 +00:00
result = node1 . query ( f " desc schema_inference " )
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
result = node1 . query ( f " select count(*) from schema_inference " )
assert ( int ( result ) == 5000000 )
2021-10-31 19:53:24 +00:00
2021-12-03 05:25:14 +00:00
def test_hdfsCluster ( started_cluster ) :
hdfs_api = started_cluster . hdfs_api
fs = HdfsClient ( hosts = started_cluster . hdfs_ip )
dir = ' /test_hdfsCluster '
exists = fs . exists ( dir )
if exists :
fs . delete ( dir , recursive = True )
fs . mkdirs ( dir )
hdfs_api . write_data ( " /test_hdfsCluster/file1 " , " 1 \n " )
hdfs_api . write_data ( " /test_hdfsCluster/file2 " , " 2 \n " )
hdfs_api . write_data ( " /test_hdfsCluster/file3 " , " 3 \n " )
actual = node1 . query ( " select id, _file as file_name, _path as file_path from hdfs( ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) order by id " )
expected = " 1 \t file1 \t hdfs://hdfs1:9000/test_hdfsCluster/file1 \n 2 \t file2 \t hdfs://hdfs1:9000/test_hdfsCluster/file2 \n 3 \t file3 \t hdfs://hdfs1:9000/test_hdfsCluster/file3 \n "
assert actual == expected
actual = node1 . query ( " select id, _file as file_name, _path as file_path from hdfsCluster( ' test_cluster_two_shards ' , ' hdfs://hdfs1:9000/test_hdfsCluster/file* ' , ' TSV ' , ' id UInt32 ' ) order by id " )
expected = " 1 \t file1 \t hdfs://hdfs1:9000/test_hdfsCluster/file1 \n 2 \t file2 \t hdfs://hdfs1:9000/test_hdfsCluster/file2 \n 3 \t file3 \t hdfs://hdfs1:9000/test_hdfsCluster/file3 \n "
assert actual == expected
fs . delete ( dir , recursive = True )
2022-01-18 12:47:04 +00:00
def test_hdfs_directory_not_exist ( started_cluster ) :
ddl = " create table HDFSStorageWithNotExistDir (id UInt32, name String, weight Float64) ENGINE = HDFS( ' hdfs://hdfs1:9000/data/not_eixst ' , ' TSV ' ) " ;
2022-01-18 05:55:41 +00:00
node1 . query ( ddl )
2022-01-18 12:47:04 +00:00
assert " " == node1 . query ( " select * from HDFSStorageWithNotExistDir " )
2021-12-03 05:25:14 +00:00
2021-12-29 18:03:15 +00:00
def test_overwrite ( started_cluster ) :
hdfs_api = started_cluster . hdfs_api
table_function = f " hdfs( ' hdfs://hdfs1:9000/data ' , ' Parquet ' , ' a Int32, b String ' ) "
2022-01-18 19:26:13 +00:00
node1 . query ( f " create table test_overwrite as { table_function } " )
node1 . query ( f " insert into test_overwrite select number, randomString(100) from numbers(5) " )
node1 . query_and_get_error ( f " insert into test_overwrite select number, randomString(100) FROM numbers(10) " )
node1 . query ( f " insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1 " )
2021-12-29 18:03:15 +00:00
2022-01-18 19:26:13 +00:00
result = node1 . query ( f " select count() from test_overwrite " )
2021-12-29 18:03:15 +00:00
assert ( int ( result ) == 10 )
def test_multiple_inserts ( started_cluster ) :
hdfs_api = started_cluster . hdfs_api
table_function = f " hdfs( ' hdfs://hdfs1:9000/data_multiple_inserts ' , ' Parquet ' , ' a Int32, b String ' ) "
node1 . query ( f " create table test_multiple_inserts as { table_function } " )
node1 . query ( f " insert into test_multiple_inserts select number, randomString(100) from numbers(10) " )
node1 . query ( f " insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1 " )
node1 . query ( f " insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1 " )
result = node1 . query ( f " select count() from test_multiple_inserts " )
assert ( int ( result ) == 60 )
result = node1 . query ( f " drop table test_multiple_inserts " )
table_function = f " hdfs( ' hdfs://hdfs1:9000/data_multiple_inserts.gz ' , ' Parquet ' , ' a Int32, b String ' ) "
node1 . query ( f " create table test_multiple_inserts as { table_function } " )
node1 . query ( f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(10) " )
node1 . query ( f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1 " )
node1 . query ( f " insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1 " )
result = node1 . query ( f " select count() from test_multiple_inserts " )
assert ( int ( result ) == 60 )
2022-01-14 18:03:49 +00:00
2022-01-14 11:00:50 +00:00
def test_format_detection ( started_cluster ) :
node1 . query ( f " create table arrow_table (x UInt64) engine=HDFS( ' hdfs://hdfs1:9000/data.arrow ' ) " )
node1 . query ( f " insert into arrow_table select 1 " )
result = node1 . query ( f " select * from hdfs( ' hdfs://hdfs1:9000/data.arrow ' ) " )
assert ( int ( result ) == 1 )
2021-12-29 18:03:15 +00:00
2020-09-09 12:13:20 +00:00
if __name__ == ' __main__ ' :
cluster . start ( )
2020-10-30 19:40:16 +00:00
input ( " Cluster created, press any key to destroy... " )
2020-09-09 12:13:20 +00:00
cluster . shutdown ( )