2019-12-06 14:37:21 +00:00
# include <Common/config.h>
# if USE_AWS_S3
2019-12-03 16:23:24 +00:00
# include <IO/S3Common.h>
2019-05-31 07:27:14 +00:00
# include <Storages/StorageS3.h>
2019-11-04 19:57:03 +00:00
# include <Interpreters/evaluateConstantExpression.h>
2019-05-31 07:27:14 +00:00
# include <TableFunctions/TableFunctionFactory.h>
# include <TableFunctions/TableFunctionS3.h>
2019-12-11 14:21:48 +00:00
# include <TableFunctions/parseColumnsListForTableFunction.h>
2019-11-04 19:57:03 +00:00
# include <Parsers/ASTLiteral.h>
2019-12-15 06:34:43 +00:00
# include "registerTableFunctions.h"
2019-05-31 07:27:14 +00:00
namespace DB
{
2019-09-22 22:13:42 +00:00
2019-11-04 19:57:03 +00:00
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
}
StoragePtr TableFunctionS3 : : executeImpl ( const ASTPtr & ast_function , const Context & context , const std : : string & table_name ) const
{
/// Parse args
ASTs & args_func = ast_function - > children ;
if ( args_func . size ( ) ! = 1 )
throw Exception ( " Table function ' " + getName ( ) + " ' must have arguments. " , ErrorCodes : : LOGICAL_ERROR ) ;
ASTs & args = args_func . at ( 0 ) - > children ;
if ( args . size ( ) < 3 | | args . size ( ) > 6 )
throw Exception ( " Table function ' " + getName ( ) + " ' requires 3 to 6 arguments: url, [access_key_id, secret_access_key,] format, structure and [compression_method]. " ,
ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
for ( size_t i = 0 ; i < args . size ( ) ; + + i )
args [ i ] = evaluateConstantExpressionOrIdentifierAsLiteral ( args [ i ] , context ) ;
String filename = args [ 0 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
String format ;
String structure ;
String access_key_id ;
String secret_access_key ;
if ( args . size ( ) < 5 )
{
format = args [ 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
structure = args [ 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
}
else
{
access_key_id = args [ 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
secret_access_key = args [ 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
format = args [ 3 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
structure = args [ 4 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
}
String compression_method ;
if ( args . size ( ) = = 4 | | args . size ( ) = = 6 )
compression_method = args . back ( ) - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
else
compression_method = " auto " ;
ColumnsDescription columns = parseColumnsListFromString ( structure , context ) ;
/// Create table
StoragePtr storage = getStorage ( filename , access_key_id , secret_access_key , format , columns , const_cast < Context & > ( context ) , table_name , compression_method ) ;
storage - > startup ( ) ;
return storage ;
}
2019-05-31 07:27:14 +00:00
StoragePtr TableFunctionS3 : : getStorage (
2019-11-04 19:57:03 +00:00
const String & source ,
const String & access_key_id ,
const String & secret_access_key ,
const String & format ,
const ColumnsDescription & columns ,
Context & global_context ,
const std : : string & table_name ,
const String & compression_method ) const
2019-05-31 07:27:14 +00:00
{
2019-12-06 14:37:21 +00:00
Poco : : URI uri ( source ) ;
S3 : : URI s3_uri ( uri ) ;
2019-09-24 10:58:42 +00:00
UInt64 min_upload_part_size = global_context . getSettingsRef ( ) . s3_min_upload_part_size ;
2019-12-04 16:06:55 +00:00
return StorageS3 : : create ( s3_uri , access_key_id , secret_access_key , StorageID ( getDatabaseName ( ) , table_name ) , format , min_upload_part_size , columns , ConstraintsDescription { } , global_context , compression_method ) ;
2019-05-31 07:27:14 +00:00
}
void registerTableFunctionS3 ( TableFunctionFactory & factory )
{
factory . registerFunction < TableFunctionS3 > ( ) ;
}
2019-09-22 22:13:42 +00:00
2019-05-31 07:27:14 +00:00
}
2019-12-06 14:37:21 +00:00
2019-12-09 12:36:06 +00:00
# endif