2019-12-06 14:37:21 +00:00
# include <Common/config.h>
# if USE_AWS_S3
2019-12-03 16:23:24 +00:00
# include <IO/S3Common.h>
2019-05-23 09:03:39 +00:00
# include <Storages/StorageFactory.h>
2019-05-31 07:27:14 +00:00
# include <Storages/StorageS3.h>
2019-05-23 09:03:39 +00:00
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Parsers/ASTLiteral.h>
2019-06-01 21:18:20 +00:00
# include <IO/ReadBufferFromS3.h>
2019-11-20 14:48:01 +00:00
# include <IO/ReadHelpers.h>
2019-06-01 21:18:20 +00:00
# include <IO/WriteBufferFromS3.h>
2019-11-20 14:48:01 +00:00
# include <IO/WriteHelpers.h>
2019-05-23 09:03:39 +00:00
# include <Formats/FormatFactory.h>
2019-05-31 07:27:14 +00:00
2019-05-23 09:03:39 +00:00
# include <DataStreams/IBlockOutputStream.h>
# include <DataStreams/AddingDefaultsBlockInputStream.h>
2020-01-26 13:12:15 +00:00
# include <DataStreams/narrowBlockInputStreams.h>
2019-05-23 09:03:39 +00:00
2020-01-26 14:05:51 +00:00
# include <DataTypes/DataTypeString.h>
2019-05-23 09:03:39 +00:00
2019-12-03 16:23:24 +00:00
# include <aws/s3/S3Client.h>
2020-01-26 13:03:47 +00:00
# include <aws/s3/model/ListObjectsRequest.h>
# include <Common/parseGlobs.h>
# include <re2/re2.h>
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
# include <Processors/Sources/SourceWithProgress.h>
# include <Processors/Pipe.h>
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
2019-05-23 09:03:39 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
2020-01-30 06:17:55 +00:00
extern const int UNEXPECTED_EXPRESSION ;
extern const int S3_ERROR ;
2019-05-23 09:03:39 +00:00
}
2019-12-03 16:23:24 +00:00
2019-05-31 07:27:14 +00:00
namespace
2019-05-23 09:03:39 +00:00
{
2020-02-03 18:01:41 +00:00
class StorageS3Source : public SourceWithProgress
2019-05-23 09:03:39 +00:00
{
2019-05-31 07:27:14 +00:00
public :
2020-02-14 17:47:39 +00:00
static Block getHeader ( Block sample_block , bool with_path_column , bool with_file_column )
{
if ( with_path_column )
sample_block . insert ( { DataTypeString ( ) . createColumn ( ) , std : : make_shared < DataTypeString > ( ) , " _path " } ) ;
if ( with_file_column )
sample_block . insert ( { DataTypeString ( ) . createColumn ( ) , std : : make_shared < DataTypeString > ( ) , " _file " } ) ;
return sample_block ;
}
2020-02-03 18:01:41 +00:00
StorageS3Source (
2020-01-26 14:05:51 +00:00
bool need_path ,
bool need_file ,
2019-05-31 07:27:14 +00:00
const String & format ,
2020-02-03 18:01:41 +00:00
String name_ ,
2019-05-31 07:27:14 +00:00
const Block & sample_block ,
const Context & context ,
2020-02-03 18:01:41 +00:00
const ColumnDefaults & column_defaults ,
2019-05-31 07:27:14 +00:00
UInt64 max_block_size ,
2019-12-03 16:23:24 +00:00
const CompressionMethod compression_method ,
const std : : shared_ptr < Aws : : S3 : : S3Client > & client ,
const String & bucket ,
const String & key )
2020-02-14 17:50:27 +00:00
: SourceWithProgress ( getHeader ( sample_block , need_path , need_file ) )
2020-02-14 17:47:39 +00:00
, name ( std : : move ( name_ ) )
2020-01-26 14:05:51 +00:00
, with_file_column ( need_file )
, with_path_column ( need_path )
, file_path ( bucket + " / " + key )
2019-05-31 07:27:14 +00:00
{
2020-01-04 07:31:00 +00:00
read_buf = wrapReadBufferWithCompressionMethod ( std : : make_unique < ReadBufferFromS3 > ( client , bucket , key ) , compression_method ) ;
2019-05-31 07:27:14 +00:00
reader = FormatFactory : : instance ( ) . getInput ( format , * read_buf , sample_block , context , max_block_size ) ;
2020-02-03 18:01:41 +00:00
if ( ! column_defaults . empty ( ) )
reader = std : : make_shared < AddingDefaultsBlockInputStream > ( reader , column_defaults , context ) ;
2019-05-31 07:27:14 +00:00
}
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
String getName ( ) const override
{
return name ;
}
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
Chunk generate ( ) override
2019-05-31 07:27:14 +00:00
{
2020-02-03 18:01:41 +00:00
if ( ! reader )
return { } ;
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
if ( ! initialized )
{
reader - > readSuffix ( ) ;
initialized = true ;
}
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
if ( auto block = reader - > read ( ) )
{
2020-02-14 17:47:39 +00:00
auto columns = block . getColumns ( ) ;
2020-02-14 17:50:27 +00:00
UInt64 num_rows = block . rows ( ) ;
2020-02-14 17:47:39 +00:00
2020-01-26 14:05:51 +00:00
if ( with_path_column )
2020-02-14 17:50:27 +00:00
columns . push_back ( DataTypeString ( ) . createColumnConst ( num_rows , file_path ) - > convertToFullColumnIfConst ( ) ) ;
2020-01-26 14:05:51 +00:00
if ( with_file_column )
{
size_t last_slash_pos = file_path . find_last_of ( ' / ' ) ;
2020-02-14 17:50:27 +00:00
columns . push_back ( DataTypeString ( ) . createColumnConst ( num_rows , file_path . substr (
2020-02-14 17:47:39 +00:00
last_slash_pos + 1 ) ) - > convertToFullColumnIfConst ( ) ) ;
2020-01-26 14:05:51 +00:00
}
2019-05-23 09:03:39 +00:00
2020-02-14 17:47:39 +00:00
return Chunk ( std : : move ( columns ) , num_rows ) ;
2020-02-03 18:01:41 +00:00
}
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
reader - > readSuffix ( ) ;
2020-02-03 18:01:41 +00:00
reader . reset ( ) ;
return { } ;
2019-05-31 07:27:14 +00:00
}
private :
String name ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < ReadBuffer > read_buf ;
2019-05-31 07:27:14 +00:00
BlockInputStreamPtr reader ;
2020-02-03 18:01:41 +00:00
bool initialized = false ;
2020-01-26 14:05:51 +00:00
bool with_file_column = false ;
bool with_path_column = false ;
String file_path ;
2019-05-31 07:27:14 +00:00
} ;
class StorageS3BlockOutputStream : public IBlockOutputStream
2019-05-23 09:03:39 +00:00
{
2019-05-31 07:27:14 +00:00
public :
2019-12-03 16:23:24 +00:00
StorageS3BlockOutputStream (
2019-05-31 07:27:14 +00:00
const String & format ,
2019-09-23 12:41:59 +00:00
UInt64 min_upload_part_size ,
2019-05-31 07:27:14 +00:00
const Block & sample_block_ ,
const Context & context ,
2019-12-03 16:23:24 +00:00
const CompressionMethod compression_method ,
const std : : shared_ptr < Aws : : S3 : : S3Client > & client ,
const String & bucket ,
const String & key )
2019-05-31 07:27:14 +00:00
: sample_block ( sample_block_ )
{
2020-01-04 07:31:00 +00:00
write_buf = wrapWriteBufferWithCompressionMethod (
2020-01-05 06:39:06 +00:00
std : : make_unique < WriteBufferFromS3 > ( client , bucket , key , min_upload_part_size ) , compression_method , 3 ) ;
2019-05-31 07:27:14 +00:00
writer = FormatFactory : : instance ( ) . getOutput ( format , * write_buf , sample_block , context ) ;
}
Block getHeader ( ) const override
{
return sample_block ;
}
void write ( const Block & block ) override
{
writer - > write ( block ) ;
}
void writePrefix ( ) override
{
writer - > writePrefix ( ) ;
}
void writeSuffix ( ) override
{
writer - > writeSuffix ( ) ;
writer - > flush ( ) ;
write_buf - > finalize ( ) ;
}
private :
Block sample_block ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < WriteBuffer > write_buf ;
2019-05-31 07:27:14 +00:00
BlockOutputStreamPtr writer ;
} ;
}
2019-05-23 09:03:39 +00:00
2019-12-04 16:06:55 +00:00
StorageS3 : : StorageS3 (
const S3 : : URI & uri_ ,
2019-11-04 19:57:03 +00:00
const String & access_key_id_ ,
const String & secret_access_key_ ,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_ ,
2019-09-22 22:13:42 +00:00
const String & format_name_ ,
2019-09-23 12:41:59 +00:00
UInt64 min_upload_part_size_ ,
2019-09-22 22:13:42 +00:00
const ColumnsDescription & columns_ ,
const ConstraintsDescription & constraints_ ,
2019-11-19 12:46:07 +00:00
Context & context_ ,
const String & compression_method_ = " " )
2020-01-27 10:22:49 +00:00
: IStorage ( table_id_ , ColumnsDescription ( {
{ " _path " , std : : make_shared < DataTypeString > ( ) } ,
{ " _file " , std : : make_shared < DataTypeString > ( ) }
} , true ) )
2019-12-06 14:37:21 +00:00
, uri ( uri_ )
2019-09-22 22:13:42 +00:00
, context_global ( context_ )
, format_name ( format_name_ )
2019-09-23 12:41:59 +00:00
, min_upload_part_size ( min_upload_part_size_ )
2019-11-19 12:46:07 +00:00
, compression_method ( compression_method_ )
2019-12-06 14:37:21 +00:00
, client ( S3 : : ClientFactory : : instance ( ) . create ( uri_ . endpoint , access_key_id_ , secret_access_key_ ) )
2019-09-22 22:13:42 +00:00
{
2019-12-09 10:58:57 +00:00
context_global . getRemoteHostFilter ( ) . checkURL ( uri_ . uri ) ;
2019-09-22 22:13:42 +00:00
setColumns ( columns_ ) ;
setConstraints ( constraints_ ) ;
}
2020-01-27 20:08:43 +00:00
namespace
{
2020-01-26 13:03:47 +00:00
/* "Recursive" directory listing with matched paths as a result.
* Have the same method in StorageFile .
*/
2020-01-27 20:08:43 +00:00
Strings listFilesWithRegexpMatching ( Aws : : S3 : : S3Client & client , const S3 : : URI & globbed_uri )
2020-01-26 13:03:47 +00:00
{
if ( globbed_uri . bucket . find_first_of ( " *?{ " ) ! = globbed_uri . bucket . npos )
{
2020-01-30 06:17:55 +00:00
throw Exception ( " Expression can not have wildcards inside bucket name " , ErrorCodes : : UNEXPECTED_EXPRESSION ) ;
2020-01-26 13:03:47 +00:00
}
const String key_prefix = globbed_uri . key . substr ( 0 , globbed_uri . key . find_first_of ( " *?{ " ) ) ;
if ( key_prefix . size ( ) = = globbed_uri . key . size ( ) )
{
return { globbed_uri . key } ;
}
Aws : : S3 : : Model : : ListObjectsRequest request ;
request . SetBucket ( globbed_uri . bucket ) ;
request . SetPrefix ( key_prefix ) ;
re2 : : RE2 matcher ( makeRegexpPatternFromGlobs ( globbed_uri . key ) ) ;
Strings result ;
Aws : : S3 : : Model : : ListObjectsOutcome outcome ;
2020-01-30 06:17:55 +00:00
int page = 0 ;
2020-01-26 13:03:47 +00:00
do
{
2020-01-30 06:17:55 +00:00
+ + page ;
2020-01-26 13:03:47 +00:00
outcome = client . ListObjects ( request ) ;
if ( ! outcome . IsSuccess ( ) )
{
2020-01-30 06:17:55 +00:00
throw Exception ( " Could not list objects in bucket " + quoteString ( request . GetBucket ( ) )
+ " with prefix " + quoteString ( request . GetPrefix ( ) )
+ " , page " + std : : to_string ( page ) , ErrorCodes : : S3_ERROR ) ;
2020-01-26 13:03:47 +00:00
}
for ( const auto & row : outcome . GetResult ( ) . GetContents ( ) )
{
String key = row . GetKey ( ) ;
if ( re2 : : RE2 : : FullMatch ( key , matcher ) )
result . emplace_back ( std : : move ( key ) ) ;
}
request . SetMarker ( outcome . GetResult ( ) . GetNextMarker ( ) ) ;
}
while ( outcome . GetResult ( ) . GetIsTruncated ( ) ) ;
return result ;
}
}
2020-02-19 16:07:28 +00:00
Pipes StorageS3 : : read (
2019-09-22 22:13:42 +00:00
const Names & column_names ,
2019-05-23 09:03:39 +00:00
const SelectQueryInfo & /*query_info*/ ,
const Context & context ,
2019-06-01 21:18:20 +00:00
QueryProcessingStage : : Enum /*processed_stage*/ ,
2019-05-23 09:03:39 +00:00
size_t max_block_size ,
2020-01-26 13:12:15 +00:00
unsigned num_streams )
2019-05-23 09:03:39 +00:00
{
2020-02-03 18:01:41 +00:00
Pipes pipes ;
2020-01-26 14:05:51 +00:00
bool need_path_column = false ;
bool need_file_column = false ;
for ( const auto & column : column_names )
{
if ( column = = " _path " )
need_path_column = true ;
if ( column = = " _file " )
need_file_column = true ;
}
2020-01-26 13:12:15 +00:00
2020-01-27 20:08:43 +00:00
for ( const String & key : listFilesWithRegexpMatching ( * client , uri ) )
2020-02-14 17:47:39 +00:00
pipes . emplace_back ( std : : make_shared < StorageS3Source > (
2020-01-26 14:05:51 +00:00
need_path_column ,
need_file_column ,
2020-01-26 13:12:15 +00:00
format_name ,
getName ( ) ,
getHeaderBlock ( column_names ) ,
context ,
2020-02-14 17:47:39 +00:00
getColumns ( ) . getDefaults ( ) ,
2020-01-26 13:12:15 +00:00
max_block_size ,
chooseCompressionMethod ( uri . endpoint , compression_method ) ,
client ,
uri . bucket ,
2020-02-14 17:47:39 +00:00
key ) ) ;
2020-01-26 13:12:15 +00:00
2020-02-14 17:47:39 +00:00
return narrowPipes ( std : : move ( pipes ) , num_streams ) ;
2019-05-23 09:03:39 +00:00
}
2019-06-01 21:18:20 +00:00
BlockOutputStreamPtr StorageS3 : : write ( const ASTPtr & /*query*/ , const Context & /*context*/ )
2019-05-23 09:03:39 +00:00
{
2019-05-29 12:54:31 +00:00
return std : : make_shared < StorageS3BlockOutputStream > (
2019-12-03 16:23:24 +00:00
format_name , min_upload_part_size , getSampleBlock ( ) , context_global ,
2020-01-04 07:31:00 +00:00
chooseCompressionMethod ( uri . endpoint , compression_method ) ,
2019-12-06 14:37:21 +00:00
client , uri . bucket , uri . key ) ;
2019-05-23 09:03:39 +00:00
}
2019-06-17 00:06:14 +00:00
void registerStorageS3 ( StorageFactory & factory )
2019-05-23 09:03:39 +00:00
{
2019-06-17 00:06:14 +00:00
factory . registerStorage ( " S3 " , [ ] ( const StorageFactory : : Arguments & args )
{
ASTs & engine_args = args . engine_args ;
2019-11-04 19:57:03 +00:00
if ( engine_args . size ( ) < 2 | | engine_args . size ( ) > 5 )
2019-06-17 00:06:14 +00:00
throw Exception (
2019-11-04 19:57:03 +00:00
" Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method]. " , ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
2019-06-17 00:06:14 +00:00
2019-11-04 19:57:03 +00:00
for ( size_t i = 0 ; i < engine_args . size ( ) ; + + i )
engine_args [ i ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args [ i ] , args . local_context ) ;
2019-06-17 00:06:14 +00:00
String url = engine_args [ 0 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2019-12-06 14:37:21 +00:00
Poco : : URI uri ( url ) ;
S3 : : URI s3_uri ( uri ) ;
2019-06-17 00:06:14 +00:00
2019-12-03 00:03:44 +00:00
String format_name = engine_args [ engine_args . size ( ) - 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2019-06-17 00:06:14 +00:00
2019-11-04 19:57:03 +00:00
String access_key_id ;
String secret_access_key ;
if ( engine_args . size ( ) > = 4 )
{
access_key_id = engine_args [ 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
secret_access_key = engine_args [ 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
}
2019-06-17 00:06:14 +00:00
2019-09-23 12:41:59 +00:00
UInt64 min_upload_part_size = args . local_context . getSettingsRef ( ) . s3_min_upload_part_size ;
2019-11-19 12:46:07 +00:00
String compression_method ;
2019-11-04 19:57:03 +00:00
if ( engine_args . size ( ) = = 3 | | engine_args . size ( ) = = 5 )
compression_method = engine_args . back ( ) - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
else
compression_method = " auto " ;
2019-11-19 12:46:07 +00:00
2019-12-04 16:06:55 +00:00
return StorageS3 : : create ( s3_uri , access_key_id , secret_access_key , args . table_id , format_name , min_upload_part_size , args . columns , args . constraints , args . context ) ;
2019-06-17 00:06:14 +00:00
} ) ;
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
2019-12-09 12:36:06 +00:00
# endif