2019-12-06 14:37:21 +00:00
# include <Common/config.h>
# if USE_AWS_S3
2019-12-03 16:23:24 +00:00
# include <IO/S3Common.h>
2019-05-23 09:03:39 +00:00
# include <Storages/StorageFactory.h>
2019-05-31 07:27:14 +00:00
# include <Storages/StorageS3.h>
2019-05-23 09:03:39 +00:00
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Parsers/ASTLiteral.h>
2019-06-01 21:18:20 +00:00
# include <IO/ReadBufferFromS3.h>
2019-11-20 14:48:01 +00:00
# include <IO/ReadHelpers.h>
2019-06-01 21:18:20 +00:00
# include <IO/WriteBufferFromS3.h>
2019-11-20 14:48:01 +00:00
# include <IO/WriteHelpers.h>
2019-05-23 09:03:39 +00:00
# include <Formats/FormatFactory.h>
2019-05-31 07:27:14 +00:00
2019-05-23 09:03:39 +00:00
# include <DataStreams/IBlockOutputStream.h>
2019-05-31 07:27:14 +00:00
# include <DataStreams/IBlockInputStream.h>
2019-05-23 09:03:39 +00:00
# include <DataStreams/AddingDefaultsBlockInputStream.h>
2019-12-03 16:23:24 +00:00
# include <aws/s3/S3Client.h>
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
2019-05-23 09:03:39 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
}
2019-12-03 16:23:24 +00:00
2019-05-31 07:27:14 +00:00
namespace
2019-05-23 09:03:39 +00:00
{
2019-05-31 07:27:14 +00:00
class StorageS3BlockInputStream : public IBlockInputStream
2019-05-23 09:03:39 +00:00
{
2019-05-31 07:27:14 +00:00
public :
2019-12-03 16:23:24 +00:00
StorageS3BlockInputStream (
2019-05-31 07:27:14 +00:00
const String & format ,
const String & name_ ,
const Block & sample_block ,
const Context & context ,
UInt64 max_block_size ,
2019-12-03 16:23:24 +00:00
const CompressionMethod compression_method ,
const std : : shared_ptr < Aws : : S3 : : S3Client > & client ,
const String & bucket ,
const String & key )
2019-05-31 07:27:14 +00:00
: name ( name_ )
{
2020-01-04 07:31:00 +00:00
read_buf = wrapReadBufferWithCompressionMethod ( std : : make_unique < ReadBufferFromS3 > ( client , bucket , key ) , compression_method ) ;
2019-05-31 07:27:14 +00:00
reader = FormatFactory : : instance ( ) . getInput ( format , * read_buf , sample_block , context , max_block_size ) ;
}
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
String getName ( ) const override
{
return name ;
}
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
Block readImpl ( ) override
{
return reader - > read ( ) ;
}
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
Block getHeader ( ) const override
{
return reader - > getHeader ( ) ;
}
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
void readPrefixImpl ( ) override
{
reader - > readPrefix ( ) ;
}
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
void readSuffixImpl ( ) override
{
reader - > readSuffix ( ) ;
}
private :
String name ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < ReadBuffer > read_buf ;
2019-05-31 07:27:14 +00:00
BlockInputStreamPtr reader ;
} ;
class StorageS3BlockOutputStream : public IBlockOutputStream
2019-05-23 09:03:39 +00:00
{
2019-05-31 07:27:14 +00:00
public :
2019-12-03 16:23:24 +00:00
StorageS3BlockOutputStream (
2019-05-31 07:27:14 +00:00
const String & format ,
2019-09-23 12:41:59 +00:00
UInt64 min_upload_part_size ,
2019-05-31 07:27:14 +00:00
const Block & sample_block_ ,
const Context & context ,
2019-12-03 16:23:24 +00:00
const CompressionMethod compression_method ,
const std : : shared_ptr < Aws : : S3 : : S3Client > & client ,
const String & bucket ,
const String & key )
2019-05-31 07:27:14 +00:00
: sample_block ( sample_block_ )
{
2020-01-04 07:31:00 +00:00
write_buf = wrapWriteBufferWithCompressionMethod (
2020-01-05 06:39:06 +00:00
std : : make_unique < WriteBufferFromS3 > ( client , bucket , key , min_upload_part_size ) , compression_method , 3 ) ;
2019-05-31 07:27:14 +00:00
writer = FormatFactory : : instance ( ) . getOutput ( format , * write_buf , sample_block , context ) ;
}
Block getHeader ( ) const override
{
return sample_block ;
}
void write ( const Block & block ) override
{
writer - > write ( block ) ;
}
void writePrefix ( ) override
{
writer - > writePrefix ( ) ;
}
void writeSuffix ( ) override
{
writer - > writeSuffix ( ) ;
writer - > flush ( ) ;
write_buf - > finalize ( ) ;
}
private :
Block sample_block ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < WriteBuffer > write_buf ;
2019-05-31 07:27:14 +00:00
BlockOutputStreamPtr writer ;
} ;
}
2019-05-23 09:03:39 +00:00
2019-12-04 16:06:55 +00:00
StorageS3 : : StorageS3 (
const S3 : : URI & uri_ ,
2019-11-04 19:57:03 +00:00
const String & access_key_id_ ,
const String & secret_access_key_ ,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_ ,
2019-09-22 22:13:42 +00:00
const String & format_name_ ,
2019-09-23 12:41:59 +00:00
UInt64 min_upload_part_size_ ,
2019-09-22 22:13:42 +00:00
const ColumnsDescription & columns_ ,
const ConstraintsDescription & constraints_ ,
2019-11-19 12:46:07 +00:00
Context & context_ ,
const String & compression_method_ = " " )
2019-12-27 19:30:22 +00:00
: IStorage ( table_id_ , columns_ )
2019-12-06 14:37:21 +00:00
, uri ( uri_ )
2019-09-22 22:13:42 +00:00
, context_global ( context_ )
, format_name ( format_name_ )
2019-09-23 12:41:59 +00:00
, min_upload_part_size ( min_upload_part_size_ )
2019-11-19 12:46:07 +00:00
, compression_method ( compression_method_ )
2019-12-06 14:37:21 +00:00
, client ( S3 : : ClientFactory : : instance ( ) . create ( uri_ . endpoint , access_key_id_ , secret_access_key_ ) )
2019-09-22 22:13:42 +00:00
{
2019-12-09 10:58:57 +00:00
context_global . getRemoteHostFilter ( ) . checkURL ( uri_ . uri ) ;
2019-09-22 22:13:42 +00:00
setColumns ( columns_ ) ;
setConstraints ( constraints_ ) ;
}
BlockInputStreams StorageS3 : : read (
const Names & column_names ,
2019-05-23 09:03:39 +00:00
const SelectQueryInfo & /*query_info*/ ,
const Context & context ,
2019-06-01 21:18:20 +00:00
QueryProcessingStage : : Enum /*processed_stage*/ ,
2019-05-23 09:03:39 +00:00
size_t max_block_size ,
unsigned /*num_streams*/ )
{
2019-09-22 22:13:42 +00:00
BlockInputStreamPtr block_input = std : : make_shared < StorageS3BlockInputStream > (
2019-05-29 12:54:31 +00:00
format_name ,
getName ( ) ,
2019-05-31 07:27:14 +00:00
getHeaderBlock ( column_names ) ,
2019-05-29 12:54:31 +00:00
context ,
max_block_size ,
2020-01-04 07:31:00 +00:00
chooseCompressionMethod ( uri . endpoint , compression_method ) ,
2019-12-03 16:23:24 +00:00
client ,
2019-12-06 14:37:21 +00:00
uri . bucket ,
uri . key ) ;
2019-05-29 12:54:31 +00:00
auto column_defaults = getColumns ( ) . getDefaults ( ) ;
2019-05-23 09:03:39 +00:00
if ( column_defaults . empty ( ) )
return { block_input } ;
return { std : : make_shared < AddingDefaultsBlockInputStream > ( block_input , column_defaults , context ) } ;
}
2019-06-01 21:18:20 +00:00
BlockOutputStreamPtr StorageS3 : : write ( const ASTPtr & /*query*/ , const Context & /*context*/ )
2019-05-23 09:03:39 +00:00
{
2019-05-29 12:54:31 +00:00
return std : : make_shared < StorageS3BlockOutputStream > (
2019-12-03 16:23:24 +00:00
format_name , min_upload_part_size , getSampleBlock ( ) , context_global ,
2020-01-04 07:31:00 +00:00
chooseCompressionMethod ( uri . endpoint , compression_method ) ,
2019-12-06 14:37:21 +00:00
client , uri . bucket , uri . key ) ;
2019-05-23 09:03:39 +00:00
}
2019-06-17 00:06:14 +00:00
void registerStorageS3 ( StorageFactory & factory )
2019-05-23 09:03:39 +00:00
{
2019-06-17 00:06:14 +00:00
factory . registerStorage ( " S3 " , [ ] ( const StorageFactory : : Arguments & args )
{
ASTs & engine_args = args . engine_args ;
2019-11-04 19:57:03 +00:00
if ( engine_args . size ( ) < 2 | | engine_args . size ( ) > 5 )
2019-06-17 00:06:14 +00:00
throw Exception (
2019-11-04 19:57:03 +00:00
" Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method]. " , ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
2019-06-17 00:06:14 +00:00
2019-11-04 19:57:03 +00:00
for ( size_t i = 0 ; i < engine_args . size ( ) ; + + i )
engine_args [ i ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args [ i ] , args . local_context ) ;
2019-06-17 00:06:14 +00:00
String url = engine_args [ 0 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2019-12-06 14:37:21 +00:00
Poco : : URI uri ( url ) ;
S3 : : URI s3_uri ( uri ) ;
2019-06-17 00:06:14 +00:00
2019-12-03 00:03:44 +00:00
String format_name = engine_args [ engine_args . size ( ) - 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2019-06-17 00:06:14 +00:00
2019-11-04 19:57:03 +00:00
String access_key_id ;
String secret_access_key ;
if ( engine_args . size ( ) > = 4 )
{
access_key_id = engine_args [ 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
secret_access_key = engine_args [ 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
}
2019-06-17 00:06:14 +00:00
2019-09-23 12:41:59 +00:00
UInt64 min_upload_part_size = args . local_context . getSettingsRef ( ) . s3_min_upload_part_size ;
2019-11-19 12:46:07 +00:00
String compression_method ;
2019-11-04 19:57:03 +00:00
if ( engine_args . size ( ) = = 3 | | engine_args . size ( ) = = 5 )
compression_method = engine_args . back ( ) - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
else
compression_method = " auto " ;
2019-11-19 12:46:07 +00:00
2019-12-04 16:06:55 +00:00
return StorageS3 : : create ( s3_uri , access_key_id , secret_access_key , args . table_id , format_name , min_upload_part_size , args . columns , args . constraints , args . context ) ;
2019-06-17 00:06:14 +00:00
} ) ;
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
2019-12-09 12:36:06 +00:00
# endif