2022-09-28 08:45:15 +00:00
# include "config.h"
2019-12-06 14:37:21 +00:00
# if USE_AWS_S3
2019-12-03 16:23:24 +00:00
# include <IO/S3Common.h>
2019-11-04 19:57:03 +00:00
# include <Interpreters/evaluateConstantExpression.h>
2020-02-10 15:50:12 +00:00
# include <Interpreters/Context.h>
2019-05-31 07:27:14 +00:00
# include <TableFunctions/TableFunctionFactory.h>
# include <TableFunctions/TableFunctionS3.h>
2022-08-16 09:41:32 +00:00
# include <Interpreters/parseColumnsListForTableFunction.h>
2022-09-04 16:58:39 +00:00
# include <Access/Common/AccessFlags.h>
2019-11-04 19:57:03 +00:00
# include <Parsers/ASTLiteral.h>
2023-05-12 13:58:45 +00:00
# include <Parsers/ASTIdentifier.h>
# include <Parsers/ASTFunction.h>
2022-06-23 20:04:06 +00:00
# include <Storages/checkAndGetLiteralArgument.h>
2021-09-07 11:17:25 +00:00
# include <Storages/StorageS3.h>
2022-09-13 13:07:43 +00:00
# include <Storages/StorageURL.h>
2022-12-16 22:57:09 +00:00
# include <Storages/NamedCollectionsHelpers.h>
2021-12-15 11:30:57 +00:00
# include <Formats/FormatFactory.h>
2019-12-15 06:34:43 +00:00
# include "registerTableFunctions.h"
2023-06-27 11:47:24 +00:00
# include <Analyzer/FunctionNode.h>
# include <Analyzer/TableFunctionNode.h>
2019-05-31 07:27:14 +00:00
2023-03-27 14:44:34 +00:00
# include <boost/algorithm/string.hpp>
2021-04-22 01:25:40 +00:00
2019-05-31 07:27:14 +00:00
namespace DB
{
2019-09-22 22:13:42 +00:00
2019-11-04 19:57:03 +00:00
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
2023-05-15 16:39:26 +00:00
extern const int LOGICAL_ERROR ;
2019-11-04 19:57:03 +00:00
}
2021-09-07 11:17:25 +00:00
2023-06-27 11:47:24 +00:00
std : : vector < size_t > TableFunctionS3 : : skipAnalysisForArguments ( const QueryTreeNodePtr & query_node_table_function , ContextPtr ) const
{
auto & table_function_node = query_node_table_function - > as < TableFunctionNode & > ( ) ;
auto & table_function_arguments_nodes = table_function_node . getArguments ( ) . getNodes ( ) ;
size_t table_function_arguments_size = table_function_arguments_nodes . size ( ) ;
std : : vector < size_t > result ;
for ( size_t i = 0 ; i < table_function_arguments_size ; + + i )
{
auto * function_node = table_function_arguments_nodes [ i ] - > as < FunctionNode > ( ) ;
if ( function_node & & function_node - > getFunctionName ( ) = = " headers " )
result . push_back ( i ) ;
}
return result ;
}
2022-03-28 22:46:35 +00:00
/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name
2023-05-12 13:58:45 +00:00
void TableFunctionS3 : : parseArgumentsImpl ( ASTs & args , const ContextPtr & context )
2022-03-28 22:46:35 +00:00
{
2023-03-28 13:39:59 +00:00
if ( auto named_collection = tryGetNamedCollectionWithOverrides ( args , context ) )
2021-09-07 11:17:25 +00:00
{
2023-05-12 13:58:45 +00:00
StorageS3 : : processNamedCollectionResult ( configuration , * named_collection ) ;
2021-09-07 11:17:25 +00:00
}
else
{
2019-11-04 19:57:03 +00:00
2023-05-12 13:58:45 +00:00
auto * header_it = StorageURL : : collectHeaders ( args , configuration . headers_from_ast , context ) ;
2022-09-13 13:07:43 +00:00
if ( header_it ! = args . end ( ) )
args . erase ( header_it ) ;
2023-06-27 16:38:29 +00:00
if ( args . empty ( ) | | args . size ( ) > 6 )
throw Exception ( ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH , " The signature of table function {} shall be the following: \n {} " , getName ( ) , getSignature ( ) ) ;
2021-09-07 11:17:25 +00:00
for ( auto & arg : args )
arg = evaluateConstantExpressionOrIdentifierAsLiteral ( arg , context ) ;
2019-11-04 19:57:03 +00:00
2021-09-07 11:17:25 +00:00
/// Size -> argument indexes
2023-02-13 14:45:12 +00:00
static std : : unordered_map < size_t , std : : unordered_map < std : : string_view , size_t > > size_to_args
2021-09-07 11:17:25 +00:00
{
2022-01-12 15:28:13 +00:00
{ 1 , { { } } } ,
2021-09-07 11:17:25 +00:00
{ 6 , { { " access_key_id " , 1 } , { " secret_access_key " , 2 } , { " format " , 3 } , { " structure " , 4 } , { " compression_method " , 5 } } }
} ;
2019-11-04 19:57:03 +00:00
2023-02-13 14:45:12 +00:00
std : : unordered_map < std : : string_view , size_t > args_to_idx ;
2022-02-15 11:57:38 +00:00
2023-03-27 14:44:34 +00:00
bool no_sign_request = false ;
/// For 2 arguments we support 2 possible variants:
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
if ( args . size ( ) = = 2 )
{
auto second_arg = checkAndGetLiteralArgument < String > ( args [ 1 ] , " format/NOSIGN " ) ;
if ( boost : : iequals ( second_arg , " NOSIGN " ) )
no_sign_request = true ;
2022-02-15 11:57:38 +00:00
else
2023-03-27 14:44:34 +00:00
args_to_idx = { { " format " , 1 } } ;
2021-12-15 11:30:57 +00:00
}
2023-03-27 14:44:34 +00:00
/// For 3 arguments we support 3 possible variants:
/// - s3(source, format, structure)
/// - s3(source, access_key_id, access_key_id)
/// - s3(source, NOSIGN, format)
2022-02-10 15:57:02 +00:00
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if ( args . size ( ) = = 3 )
{
2023-03-27 14:44:34 +00:00
auto second_arg = checkAndGetLiteralArgument < String > ( args [ 1 ] , " format/access_key_id/NOSIGN " ) ;
if ( boost : : iequals ( second_arg , " NOSIGN " ) )
{
no_sign_request = true ;
args_to_idx = { { " format " , 2 } } ;
}
else if ( second_arg = = " auto " | | FormatFactory : : instance ( ) . getAllFormats ( ) . contains ( second_arg ) )
2022-02-10 15:57:02 +00:00
args_to_idx = { { " format " , 1 } , { " structure " , 2 } } ;
else
args_to_idx = { { " access_key_id " , 1 } , { " secret_access_key " , 2 } } ;
}
2023-03-27 14:44:34 +00:00
/// For 4 arguments we support 3 possible variants:
/// - s3(source, format, structure, compression_method),
/// - s3(source, access_key_id, access_key_id, format)
/// - s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if ( args . size ( ) = = 4 )
{
auto second_arg = checkAndGetLiteralArgument < String > ( args [ 1 ] , " format/access_key_id/NOSIGN " ) ;
if ( boost : : iequals ( second_arg , " NOSIGN " ) )
{
no_sign_request = true ;
args_to_idx = { { " format " , 2 } , { " structure " , 3 } } ;
}
else if ( second_arg = = " auto " | | FormatFactory : : instance ( ) . getAllFormats ( ) . contains ( second_arg ) )
args_to_idx = { { " format " , 1 } , { " structure " , 2 } , { " compression_method " , 3 } } ;
else
args_to_idx = { { " access_key_id " , 1 } , { " secret_access_key " , 2 } , { " format " , 3 } } ;
}
/// For 5 arguments we support 2 possible variants:
/// - s3(source, access_key_id, access_key_id, format, structure)
/// - s3(source, NOSIGN, format, structure, compression_method)
2023-03-29 11:08:44 +00:00
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
2023-03-27 14:44:34 +00:00
else if ( args . size ( ) = = 5 )
{
auto second_arg = checkAndGetLiteralArgument < String > ( args [ 1 ] , " NOSIGN/access_key_id " ) ;
if ( boost : : iequals ( second_arg , " NOSIGN " ) )
{
no_sign_request = true ;
args_to_idx = { { " format " , 2 } , { " structure " , 3 } , { " compression_method " , 4 } } ;
}
else
args_to_idx = { { " access_key_id " , 1 } , { " secret_access_key " , 2 } , { " format " , 3 } , { " structure " , 4 } } ;
}
2021-12-15 11:30:57 +00:00
else
{
args_to_idx = size_to_args [ args . size ( ) ] ;
}
2021-09-07 11:17:25 +00:00
/// This argument is always the first
2023-05-12 13:58:45 +00:00
configuration . url = S3 : : URI ( checkAndGetLiteralArgument < String > ( args [ 0 ] , " url " ) ) ;
2021-04-13 19:57:01 +00:00
2021-09-07 11:17:25 +00:00
if ( args_to_idx . contains ( " format " ) )
2023-05-04 07:56:00 +00:00
{
2023-05-22 19:19:57 +00:00
auto format = checkAndGetLiteralArgument < String > ( args [ args_to_idx [ " format " ] ] , " format " ) ;
/// Set format to configuration only of it's not 'auto',
/// because we can have default format set in configuration.
if ( format ! = " auto " )
configuration . format = format ;
2023-05-04 07:56:00 +00:00
}
2021-04-13 19:57:01 +00:00
2021-09-07 11:17:25 +00:00
if ( args_to_idx . contains ( " structure " ) )
2023-05-12 13:58:45 +00:00
configuration . structure = checkAndGetLiteralArgument < String > ( args [ args_to_idx [ " structure " ] ] , " structure " ) ;
2021-04-13 19:57:01 +00:00
2021-09-07 11:17:25 +00:00
if ( args_to_idx . contains ( " compression_method " ) )
2023-05-12 13:58:45 +00:00
configuration . compression_method = checkAndGetLiteralArgument < String > ( args [ args_to_idx [ " compression_method " ] ] , " compression_method " ) ;
2021-04-13 19:57:01 +00:00
2021-09-07 11:17:25 +00:00
if ( args_to_idx . contains ( " access_key_id " ) )
2023-05-12 13:58:45 +00:00
configuration . auth_settings . access_key_id = checkAndGetLiteralArgument < String > ( args [ args_to_idx [ " access_key_id " ] ] , " access_key_id " ) ;
2021-04-13 19:57:01 +00:00
2021-09-07 11:17:25 +00:00
if ( args_to_idx . contains ( " secret_access_key " ) )
2023-05-12 13:58:45 +00:00
configuration . auth_settings . secret_access_key = checkAndGetLiteralArgument < String > ( args [ args_to_idx [ " secret_access_key " ] ] , " secret_access_key " ) ;
2023-03-27 14:44:34 +00:00
2023-05-12 13:58:45 +00:00
configuration . auth_settings . no_sign_request = no_sign_request ;
2021-09-07 11:17:25 +00:00
}
2021-04-13 19:57:01 +00:00
2023-05-12 13:58:45 +00:00
configuration . keys = { configuration . url . key } ;
2023-03-30 13:32:38 +00:00
2023-05-12 13:58:45 +00:00
if ( configuration . format = = " auto " )
configuration . format = FormatFactory : : instance ( ) . getFormatFromFileName ( configuration . url . uri . getPath ( ) , true ) ;
2022-03-28 22:46:35 +00:00
}
void TableFunctionS3 : : parseArguments ( const ASTPtr & ast_function , ContextPtr context )
{
2023-05-12 13:58:45 +00:00
/// Clone ast function, because we can modify its arguments like removing headers.
auto ast_copy = ast_function - > clone ( ) ;
2022-03-28 22:46:35 +00:00
/// Parse args
ASTs & args_func = ast_function - > children ;
if ( args_func . size ( ) ! = 1 )
2023-01-23 21:13:58 +00:00
throw Exception ( ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH , " Table function '{}' must have arguments. " , getName ( ) ) ;
2022-03-28 22:46:35 +00:00
auto & args = args_func . at ( 0 ) - > children ;
2022-01-12 15:28:13 +00:00
2023-05-12 13:58:45 +00:00
parseArgumentsImpl ( args , context ) ;
}
void TableFunctionS3 : : addColumnsStructureToArguments ( ASTs & args , const String & structure , const ContextPtr & context )
{
if ( tryGetNamedCollectionWithOverrides ( args , context ) )
{
/// In case of named collection, just add key-value pair "structure='...'"
/// at the end of arguments to override existed structure.
ASTs equal_func_args = { std : : make_shared < ASTIdentifier > ( " structure " ) , std : : make_shared < ASTLiteral > ( structure ) } ;
auto equal_func = makeASTFunction ( " equals " , std : : move ( equal_func_args ) ) ;
args . push_back ( equal_func ) ;
}
else
{
/// If arguments contain headers, just remove it and add to the end of arguments later
/// (header argument can be at any position).
HTTPHeaderEntries tmp_headers ;
auto * headers_it = StorageURL : : collectHeaders ( args , tmp_headers , context ) ;
ASTPtr headers_ast ;
if ( headers_it ! = args . end ( ) )
{
headers_ast = * headers_it ;
args . erase ( headers_it ) ;
}
if ( args . empty ( ) | | args . size ( ) > getMaxNumberOfArguments ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Expected 1 to {} arguments in table function, got {} " , getMaxNumberOfArguments ( ) , args . size ( ) ) ;
auto structure_literal = std : : make_shared < ASTLiteral > ( structure ) ;
/// s3(s3_url)
if ( args . size ( ) = = 1 )
{
/// Add format=auto before structure argument.
args . push_back ( std : : make_shared < ASTLiteral > ( " auto " ) ) ;
args . push_back ( structure_literal ) ;
}
/// s3(s3_url, format) or s3(s3_url, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
else if ( args . size ( ) = = 2 )
{
auto second_arg = checkAndGetLiteralArgument < String > ( args [ 1 ] , " format/NOSIGN " ) ;
/// If there is NOSIGN, add format=auto before structure.
if ( boost : : iequals ( second_arg , " NOSIGN " ) )
args . push_back ( std : : make_shared < ASTLiteral > ( " auto " ) ) ;
args . push_back ( structure_literal ) ;
}
/// s3(source, format, structure) or
/// s3(source, access_key_id, access_key_id) or
/// s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if ( args . size ( ) = = 3 )
{
auto second_arg = checkAndGetLiteralArgument < String > ( args [ 1 ] , " format/NOSIGN " ) ;
if ( boost : : iequals ( second_arg , " NOSIGN " ) )
{
args . push_back ( structure_literal ) ;
}
else if ( second_arg = = " auto " | | FormatFactory : : instance ( ) . getAllFormats ( ) . contains ( second_arg ) )
{
args . back ( ) = structure_literal ;
}
else
{
/// Add format=auto before structure argument.
args . push_back ( std : : make_shared < ASTLiteral > ( " auto " ) ) ;
args . push_back ( structure_literal ) ;
}
}
/// s3(source, format, structure, compression_method) or
/// s3(source, access_key_id, access_key_id, format) or
/// s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if ( args . size ( ) = = 4 )
{
auto second_arg = checkAndGetLiteralArgument < String > ( args [ 1 ] , " format/NOSIGN " ) ;
if ( boost : : iequals ( second_arg , " NOSIGN " ) )
{
args . back ( ) = structure_literal ;
}
else if ( second_arg = = " auto " | | FormatFactory : : instance ( ) . getAllFormats ( ) . contains ( second_arg ) )
{
args [ args . size ( ) - 2 ] = structure_literal ;
}
else
{
args . push_back ( structure_literal ) ;
}
}
/// s3(source, access_key_id, access_key_id, format, structure) or
/// s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
else if ( args . size ( ) = = 5 )
{
auto sedond_arg = checkAndGetLiteralArgument < String > ( args [ 1 ] , " format/NOSIGN " ) ;
if ( boost : : iequals ( sedond_arg , " NOSIGN " ) )
{
args [ args . size ( ) - 2 ] = structure_literal ;
}
else
{
args . back ( ) = structure_literal ;
}
}
/// s3(source, access_key_id, access_key_id, format, structure, compression)
else if ( args . size ( ) = = 6 )
{
args [ args . size ( ) - 2 ] = structure_literal ;
}
if ( headers_ast )
args . push_back ( headers_ast ) ;
}
2020-10-14 12:19:29 +00:00
}
2019-11-04 19:57:03 +00:00
2021-04-10 23:33:54 +00:00
ColumnsDescription TableFunctionS3 : : getActualTableStructure ( ContextPtr context ) const
2020-10-14 12:19:29 +00:00
{
2022-03-28 22:46:35 +00:00
if ( configuration . structure = = " auto " )
2021-12-15 11:30:57 +00:00
{
2022-09-04 16:58:39 +00:00
context - > checkAccess ( getSourceAccessType ( ) ) ;
2023-04-03 17:53:34 +00:00
configuration . update ( context ) ;
2023-02-15 14:30:43 +00:00
return StorageS3 : : getTableStructureFromData ( configuration , std : : nullopt , context ) ;
2021-12-15 11:30:57 +00:00
}
2022-03-28 22:46:35 +00:00
return parseColumnsListFromString ( configuration . structure , context ) ;
2020-10-14 12:19:29 +00:00
}
2019-11-04 19:57:03 +00:00
2022-10-14 15:09:35 +00:00
bool TableFunctionS3 : : supportsReadingSubsetOfColumns ( )
{
return FormatFactory : : instance ( ) . checkIfFormatSupportsSubsetOfColumns ( configuration . format ) ;
}
2021-04-10 23:33:54 +00:00
StoragePtr TableFunctionS3 : : executeImpl ( const ASTPtr & /*ast_function*/ , ContextPtr context , const std : : string & table_name , ColumnsDescription /*cached_columns*/ ) const
2020-10-14 12:19:29 +00:00
{
2022-11-18 04:09:11 +00:00
S3 : : URI s3_uri ( configuration . url ) ;
2020-10-14 12:19:29 +00:00
2021-12-15 11:30:57 +00:00
ColumnsDescription columns ;
2022-03-28 22:46:35 +00:00
if ( configuration . structure ! = " auto " )
columns = parseColumnsListFromString ( configuration . structure , context ) ;
2022-02-18 16:19:42 +00:00
else if ( ! structure_hint . empty ( ) )
columns = structure_hint ;
2021-12-15 11:30:57 +00:00
2022-04-19 20:47:29 +00:00
StoragePtr storage = std : : make_shared < StorageS3 > (
2023-03-28 13:39:59 +00:00
configuration ,
2023-03-24 21:35:12 +00:00
context ,
2021-04-23 12:18:23 +00:00
StorageID ( getDatabaseName ( ) , table_name ) ,
2022-03-10 14:16:07 +00:00
columns ,
2021-04-23 12:18:23 +00:00
ConstraintsDescription { } ,
String { } ,
2021-08-23 19:05:28 +00:00
/// No format_settings for table function S3
2022-09-13 13:07:43 +00:00
std : : nullopt ) ;
2019-11-04 19:57:03 +00:00
storage - > startup ( ) ;
return storage ;
}
2019-05-31 07:27:14 +00:00
2023-04-27 13:58:18 +00:00
class TableFunctionGCS : public TableFunctionS3
{
public :
static constexpr auto name = " gcs " ;
std : : string getName ( ) const override
{
return name ;
}
private :
const char * getStorageTypeName ( ) const override { return " GCS " ; }
} ;
class TableFunctionCOS : public TableFunctionS3
{
public :
static constexpr auto name = " cosn " ;
std : : string getName ( ) const override
{
return name ;
}
private :
const char * getStorageTypeName ( ) const override { return " COSN " ; }
} ;
class TableFunctionOSS : public TableFunctionS3
{
public :
static constexpr auto name = " oss " ;
std : : string getName ( ) const override
{
return name ;
}
private :
const char * getStorageTypeName ( ) const override { return " OSS " ; }
} ;
void registerTableFunctionGCS ( TableFunctionFactory & factory )
{
2023-05-05 08:14:06 +00:00
factory . registerFunction < TableFunctionGCS > (
{ . documentation
2023-05-16 10:07:21 +00:00
= { . description = R " (The table function can be used to read the data stored on Google Cloud Storage.) " ,
. examples { { " gcs " , " SELECT * FROM gcs(url, hmac_key, hmac_secret) " , " " } } ,
. categories { " DataLake " } } ,
2023-05-05 08:14:06 +00:00
. allow_readonly = false } ) ;
2023-04-27 13:58:18 +00:00
}
2019-05-31 07:27:14 +00:00
void registerTableFunctionS3 ( TableFunctionFactory & factory )
{
2023-05-05 08:14:06 +00:00
factory . registerFunction < TableFunctionS3 > (
{ . documentation
2023-05-16 10:07:21 +00:00
= { . description = R " (The table function can be used to read the data stored on AWS S3.) " ,
. examples { { " s3 " , " SELECT * FROM s3(url, access_key_id, secret_access_key) " , " " } } ,
. categories { " DataLake " } } ,
2023-05-05 10:41:30 +00:00
. allow_readonly = false } ) ;
2019-05-31 07:27:14 +00:00
}
2019-09-22 22:13:42 +00:00
2023-05-16 10:07:21 +00:00
2020-07-13 14:13:30 +00:00
void registerTableFunctionCOS ( TableFunctionFactory & factory )
{
2020-07-17 03:33:29 +00:00
factory . registerFunction < TableFunctionCOS > ( ) ;
2020-07-13 14:13:30 +00:00
}
2022-11-11 08:40:10 +00:00
void registerTableFunctionOSS ( TableFunctionFactory & factory )
{
factory . registerFunction < TableFunctionOSS > ( ) ;
}
2019-05-31 07:27:14 +00:00
}
2019-12-06 14:37:21 +00:00
2019-12-09 12:36:06 +00:00
# endif