2019-12-06 14:37:21 +00:00
# include <Common/config.h>
# if USE_AWS_S3
2019-12-03 16:23:24 +00:00
# include <IO/S3Common.h>
2019-05-23 09:03:39 +00:00
# include <Storages/StorageFactory.h>
2019-05-31 07:27:14 +00:00
# include <Storages/StorageS3.h>
2020-06-01 17:16:09 +00:00
# include <Storages/StorageS3Settings.h>
2019-05-23 09:03:39 +00:00
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Parsers/ASTLiteral.h>
2019-06-01 21:18:20 +00:00
# include <IO/ReadBufferFromS3.h>
2019-11-20 14:48:01 +00:00
# include <IO/ReadHelpers.h>
2019-06-01 21:18:20 +00:00
# include <IO/WriteBufferFromS3.h>
2019-11-20 14:48:01 +00:00
# include <IO/WriteHelpers.h>
2019-05-23 09:03:39 +00:00
# include <Formats/FormatFactory.h>
2019-05-31 07:27:14 +00:00
2019-05-23 09:03:39 +00:00
# include <DataStreams/IBlockOutputStream.h>
# include <DataStreams/AddingDefaultsBlockInputStream.h>
2020-01-26 13:12:15 +00:00
# include <DataStreams/narrowBlockInputStreams.h>
2019-05-23 09:03:39 +00:00
2020-01-26 14:05:51 +00:00
# include <DataTypes/DataTypeString.h>
2020-06-01 17:16:09 +00:00
# include <aws/core/auth/AWSCredentials.h>
2019-12-03 16:23:24 +00:00
# include <aws/s3/S3Client.h>
2020-05-25 09:26:50 +00:00
# include <aws/s3/model/ListObjectsV2Request.h>
2020-01-26 13:03:47 +00:00
# include <Common/parseGlobs.h>
2020-02-17 19:28:25 +00:00
# include <Common/quoteString.h>
2020-01-26 13:03:47 +00:00
# include <re2/re2.h>
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
# include <Processors/Sources/SourceWithProgress.h>
# include <Processors/Pipe.h>
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
2019-05-23 09:03:39 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
2020-01-30 06:17:55 +00:00
extern const int UNEXPECTED_EXPRESSION ;
extern const int S3_ERROR ;
2019-05-23 09:03:39 +00:00
}
2019-12-03 16:23:24 +00:00
2019-05-31 07:27:14 +00:00
namespace
2019-05-23 09:03:39 +00:00
{
2020-02-03 18:01:41 +00:00
class StorageS3Source : public SourceWithProgress
2019-05-23 09:03:39 +00:00
{
2019-05-31 07:27:14 +00:00
public :
2020-02-14 17:47:39 +00:00
static Block getHeader ( Block sample_block , bool with_path_column , bool with_file_column )
{
if ( with_path_column )
sample_block . insert ( { DataTypeString ( ) . createColumn ( ) , std : : make_shared < DataTypeString > ( ) , " _path " } ) ;
if ( with_file_column )
sample_block . insert ( { DataTypeString ( ) . createColumn ( ) , std : : make_shared < DataTypeString > ( ) , " _file " } ) ;
return sample_block ;
}
2020-02-03 18:01:41 +00:00
StorageS3Source (
2020-01-26 14:05:51 +00:00
bool need_path ,
bool need_file ,
2019-05-31 07:27:14 +00:00
const String & format ,
2020-02-03 18:01:41 +00:00
String name_ ,
2019-05-31 07:27:14 +00:00
const Block & sample_block ,
const Context & context ,
2020-10-02 12:38:50 +00:00
const ColumnsDescription & columns ,
2019-05-31 07:27:14 +00:00
UInt64 max_block_size ,
2019-12-03 16:23:24 +00:00
const CompressionMethod compression_method ,
const std : : shared_ptr < Aws : : S3 : : S3Client > & client ,
const String & bucket ,
const String & key )
2020-02-14 17:50:27 +00:00
: SourceWithProgress ( getHeader ( sample_block , need_path , need_file ) )
2020-02-14 17:47:39 +00:00
, name ( std : : move ( name_ ) )
2020-01-26 14:05:51 +00:00
, with_file_column ( need_file )
, with_path_column ( need_path )
, file_path ( bucket + " / " + key )
2019-05-31 07:27:14 +00:00
{
2020-01-04 07:31:00 +00:00
read_buf = wrapReadBufferWithCompressionMethod ( std : : make_unique < ReadBufferFromS3 > ( client , bucket , key ) , compression_method ) ;
2019-05-31 07:27:14 +00:00
reader = FormatFactory : : instance ( ) . getInput ( format , * read_buf , sample_block , context , max_block_size ) ;
2020-02-03 18:01:41 +00:00
2020-10-02 12:38:50 +00:00
if ( columns . hasDefaults ( ) )
reader = std : : make_shared < AddingDefaultsBlockInputStream > ( reader , columns , context ) ;
2019-05-31 07:27:14 +00:00
}
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
String getName ( ) const override
{
return name ;
}
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
Chunk generate ( ) override
2019-05-31 07:27:14 +00:00
{
2020-02-03 18:01:41 +00:00
if ( ! reader )
return { } ;
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
if ( ! initialized )
{
reader - > readSuffix ( ) ;
initialized = true ;
}
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
if ( auto block = reader - > read ( ) )
2020-01-26 14:05:51 +00:00
{
2020-02-14 17:47:39 +00:00
auto columns = block . getColumns ( ) ;
2020-02-14 17:50:27 +00:00
UInt64 num_rows = block . rows ( ) ;
2020-02-14 17:47:39 +00:00
2020-01-26 14:05:51 +00:00
if ( with_path_column )
2020-02-14 17:50:27 +00:00
columns . push_back ( DataTypeString ( ) . createColumnConst ( num_rows , file_path ) - > convertToFullColumnIfConst ( ) ) ;
2020-01-26 14:05:51 +00:00
if ( with_file_column )
{
size_t last_slash_pos = file_path . find_last_of ( ' / ' ) ;
2020-02-14 17:50:27 +00:00
columns . push_back ( DataTypeString ( ) . createColumnConst ( num_rows , file_path . substr (
2020-02-14 17:47:39 +00:00
last_slash_pos + 1 ) ) - > convertToFullColumnIfConst ( ) ) ;
2020-01-26 14:05:51 +00:00
}
2019-05-23 09:03:39 +00:00
2020-02-14 17:47:39 +00:00
return Chunk ( std : : move ( columns ) , num_rows ) ;
2020-01-26 14:05:51 +00:00
}
2019-05-23 09:03:39 +00:00
2020-02-03 18:01:41 +00:00
reader . reset ( ) ;
return { } ;
2019-05-31 07:27:14 +00:00
}
private :
String name ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < ReadBuffer > read_buf ;
2019-05-31 07:27:14 +00:00
BlockInputStreamPtr reader ;
2020-02-03 18:01:41 +00:00
bool initialized = false ;
2020-01-26 14:05:51 +00:00
bool with_file_column = false ;
bool with_path_column = false ;
String file_path ;
2019-05-31 07:27:14 +00:00
} ;
class StorageS3BlockOutputStream : public IBlockOutputStream
2019-05-23 09:03:39 +00:00
{
2019-05-31 07:27:14 +00:00
public :
2019-12-03 16:23:24 +00:00
StorageS3BlockOutputStream (
2019-05-31 07:27:14 +00:00
const String & format ,
const Block & sample_block_ ,
const Context & context ,
2019-12-03 16:23:24 +00:00
const CompressionMethod compression_method ,
const std : : shared_ptr < Aws : : S3 : : S3Client > & client ,
const String & bucket ,
2020-12-09 14:09:04 +00:00
const String & key ,
size_t min_upload_part_size ,
size_t max_single_part_upload_size )
2019-05-31 07:27:14 +00:00
: sample_block ( sample_block_ )
{
2020-01-04 07:31:00 +00:00
write_buf = wrapWriteBufferWithCompressionMethod (
2020-12-09 14:09:04 +00:00
std : : make_unique < WriteBufferFromS3 > ( client , bucket , key , min_upload_part_size , max_single_part_upload_size ) , compression_method , 3 ) ;
2019-05-31 07:27:14 +00:00
writer = FormatFactory : : instance ( ) . getOutput ( format , * write_buf , sample_block , context ) ;
}
Block getHeader ( ) const override
{
return sample_block ;
}
void write ( const Block & block ) override
{
writer - > write ( block ) ;
}
void writePrefix ( ) override
{
writer - > writePrefix ( ) ;
}
void writeSuffix ( ) override
{
writer - > writeSuffix ( ) ;
writer - > flush ( ) ;
write_buf - > finalize ( ) ;
}
private :
Block sample_block ;
2019-11-19 12:46:07 +00:00
std : : unique_ptr < WriteBuffer > write_buf ;
2019-05-31 07:27:14 +00:00
BlockOutputStreamPtr writer ;
} ;
}
2019-05-23 09:03:39 +00:00
2019-12-04 16:06:55 +00:00
StorageS3 : : StorageS3 (
const S3 : : URI & uri_ ,
2019-11-04 19:57:03 +00:00
const String & access_key_id_ ,
const String & secret_access_key_ ,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_ ,
2019-09-22 22:13:42 +00:00
const String & format_name_ ,
2019-09-23 12:41:59 +00:00
UInt64 min_upload_part_size_ ,
2020-12-09 14:09:04 +00:00
UInt64 max_single_part_upload_size_ ,
2019-09-22 22:13:42 +00:00
const ColumnsDescription & columns_ ,
const ConstraintsDescription & constraints_ ,
2020-11-06 14:07:56 +00:00
const Context & context_ ,
2020-09-30 12:04:21 +00:00
const String & compression_method_ )
2020-04-27 13:55:30 +00:00
: IStorage ( table_id_ )
2019-12-06 14:37:21 +00:00
, uri ( uri_ )
2020-11-06 14:07:56 +00:00
, global_context ( context_ . getGlobalContext ( ) )
2019-09-22 22:13:42 +00:00
, format_name ( format_name_ )
2019-09-23 12:41:59 +00:00
, min_upload_part_size ( min_upload_part_size_ )
2020-12-09 14:09:04 +00:00
, max_single_part_upload_size ( max_single_part_upload_size_ )
2019-11-19 12:46:07 +00:00
, compression_method ( compression_method_ )
2020-07-13 14:13:30 +00:00
, name ( uri_ . storage_name )
2019-09-22 22:13:42 +00:00
{
2020-11-06 14:07:56 +00:00
global_context . getRemoteHostFilter ( ) . checkURL ( uri_ . uri ) ;
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata ;
storage_metadata . setColumns ( columns_ ) ;
storage_metadata . setConstraints ( constraints_ ) ;
setInMemoryMetadata ( storage_metadata ) ;
2020-06-01 17:16:09 +00:00
auto settings = context_ . getStorageS3Settings ( ) . getSettings ( uri . endpoint ) ;
Aws : : Auth : : AWSCredentials credentials ( access_key_id_ , secret_access_key_ ) ;
if ( access_key_id_ . empty ( ) )
credentials = Aws : : Auth : : AWSCredentials ( std : : move ( settings . access_key_id ) , std : : move ( settings . secret_access_key ) ) ;
client = S3 : : ClientFactory : : instance ( ) . create (
2020-12-10 09:19:42 +00:00
uri_ . endpoint ,
uri_ . is_virtual_hosted_style ,
credentials . GetAWSAccessKeyId ( ) ,
credentials . GetAWSSecretKey ( ) ,
std : : move ( settings . headers ) ,
settings . use_environment_credentials . value_or ( global_context . getConfigRef ( ) . getBool ( " s3.use_environment_credentials " , false ) ) ,
context_ . getRemoteHostFilter ( ) ,
context_ . getGlobalContext ( ) . getSettingsRef ( ) . s3_max_redirects ) ;
2019-09-22 22:13:42 +00:00
}
2020-01-27 20:08:43 +00:00
namespace
{
2020-06-01 17:16:09 +00:00
/* "Recursive" directory listing with matched paths as a result.
2020-01-26 13:03:47 +00:00
* Have the same method in StorageFile .
*/
2020-01-27 20:08:43 +00:00
Strings listFilesWithRegexpMatching ( Aws : : S3 : : S3Client & client , const S3 : : URI & globbed_uri )
2020-01-26 13:03:47 +00:00
{
if ( globbed_uri . bucket . find_first_of ( " *?{ " ) ! = globbed_uri . bucket . npos )
{
2020-01-30 06:17:55 +00:00
throw Exception ( " Expression can not have wildcards inside bucket name " , ErrorCodes : : UNEXPECTED_EXPRESSION ) ;
2020-01-26 13:03:47 +00:00
}
const String key_prefix = globbed_uri . key . substr ( 0 , globbed_uri . key . find_first_of ( " *?{ " ) ) ;
if ( key_prefix . size ( ) = = globbed_uri . key . size ( ) )
{
return { globbed_uri . key } ;
}
2020-05-25 09:26:50 +00:00
Aws : : S3 : : Model : : ListObjectsV2Request request ;
2020-01-26 13:03:47 +00:00
request . SetBucket ( globbed_uri . bucket ) ;
request . SetPrefix ( key_prefix ) ;
re2 : : RE2 matcher ( makeRegexpPatternFromGlobs ( globbed_uri . key ) ) ;
Strings result ;
2020-05-25 09:26:50 +00:00
Aws : : S3 : : Model : : ListObjectsV2Outcome outcome ;
2020-01-30 06:17:55 +00:00
int page = 0 ;
2020-01-26 13:03:47 +00:00
do
{
2020-01-30 06:17:55 +00:00
+ + page ;
2020-05-25 09:26:50 +00:00
outcome = client . ListObjectsV2 ( request ) ;
2020-01-26 13:03:47 +00:00
if ( ! outcome . IsSuccess ( ) )
{
2020-05-25 07:30:48 +00:00
if ( page > 1 )
2020-11-09 19:07:38 +00:00
throw Exception ( ErrorCodes : : S3_ERROR , " Could not list objects in bucket {} with prefix {}, page {}, S3 exception: {}, message: {} " ,
quoteString ( request . GetBucket ( ) ) , quoteString ( request . GetPrefix ( ) ) , page ,
backQuote ( outcome . GetError ( ) . GetExceptionName ( ) ) , quoteString ( outcome . GetError ( ) . GetMessage ( ) ) ) ;
2020-05-25 07:30:48 +00:00
2020-11-09 19:07:38 +00:00
throw Exception ( ErrorCodes : : S3_ERROR , " Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {} " ,
quoteString ( request . GetBucket ( ) ) , quoteString ( request . GetPrefix ( ) ) ,
backQuote ( outcome . GetError ( ) . GetExceptionName ( ) ) , quoteString ( outcome . GetError ( ) . GetMessage ( ) ) ) ;
2020-01-26 13:03:47 +00:00
}
for ( const auto & row : outcome . GetResult ( ) . GetContents ( ) )
{
String key = row . GetKey ( ) ;
if ( re2 : : RE2 : : FullMatch ( key , matcher ) )
result . emplace_back ( std : : move ( key ) ) ;
}
2020-05-25 09:26:50 +00:00
request . SetContinuationToken ( outcome . GetResult ( ) . GetNextContinuationToken ( ) ) ;
2020-01-26 13:03:47 +00:00
}
while ( outcome . GetResult ( ) . GetIsTruncated ( ) ) ;
return result ;
}
}
2020-08-03 13:54:14 +00:00
Pipe StorageS3 : : read (
2019-09-22 22:13:42 +00:00
const Names & column_names ,
2020-06-16 14:25:08 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-09-20 17:52:17 +00:00
SelectQueryInfo & /*query_info*/ ,
2019-05-23 09:03:39 +00:00
const Context & context ,
2019-06-01 21:18:20 +00:00
QueryProcessingStage : : Enum /*processed_stage*/ ,
2019-05-23 09:03:39 +00:00
size_t max_block_size ,
2020-01-26 13:12:15 +00:00
unsigned num_streams )
2019-05-23 09:03:39 +00:00
{
2020-02-03 18:01:41 +00:00
Pipes pipes ;
2020-01-26 14:05:51 +00:00
bool need_path_column = false ;
bool need_file_column = false ;
for ( const auto & column : column_names )
{
if ( column = = " _path " )
need_path_column = true ;
if ( column = = " _file " )
need_file_column = true ;
}
2020-01-26 13:12:15 +00:00
2020-01-27 20:08:43 +00:00
for ( const String & key : listFilesWithRegexpMatching ( * client , uri ) )
2020-02-14 17:47:39 +00:00
pipes . emplace_back ( std : : make_shared < StorageS3Source > (
2020-01-26 14:05:51 +00:00
need_path_column ,
need_file_column ,
2020-01-26 13:12:15 +00:00
format_name ,
getName ( ) ,
2020-06-16 14:25:08 +00:00
metadata_snapshot - > getSampleBlock ( ) ,
2020-01-26 13:12:15 +00:00
context ,
2020-10-02 12:38:50 +00:00
metadata_snapshot - > getColumns ( ) ,
2020-01-26 13:12:15 +00:00
max_block_size ,
chooseCompressionMethod ( uri . endpoint , compression_method ) ,
client ,
uri . bucket ,
2020-02-14 17:47:39 +00:00
key ) ) ;
2020-01-26 13:12:15 +00:00
2020-08-03 13:54:14 +00:00
auto pipe = Pipe : : unitePipes ( std : : move ( pipes ) ) ;
2020-10-12 09:58:09 +00:00
// It's possible to have many buckets read from s3, resize(num_streams) might open too many handles at the same time.
// Using narrowPipe instead.
2020-08-03 13:54:14 +00:00
narrowPipe ( pipe , num_streams ) ;
return pipe ;
2019-05-23 09:03:39 +00:00
}
2020-06-16 14:25:08 +00:00
BlockOutputStreamPtr StorageS3 : : write ( const ASTPtr & /*query*/ , const StorageMetadataPtr & metadata_snapshot , const Context & /*context*/ )
2019-05-23 09:03:39 +00:00
{
2019-05-29 12:54:31 +00:00
return std : : make_shared < StorageS3BlockOutputStream > (
2020-12-09 14:09:04 +00:00
format_name ,
metadata_snapshot - > getSampleBlock ( ) ,
global_context ,
chooseCompressionMethod ( uri . endpoint , compression_method ) ,
client ,
uri . bucket ,
uri . key ,
min_upload_part_size ,
max_single_part_upload_size ) ;
2019-05-23 09:03:39 +00:00
}
2020-07-13 14:13:30 +00:00
void registerStorageS3Impl ( const String & name , StorageFactory & factory )
2019-05-23 09:03:39 +00:00
{
2020-07-13 14:13:30 +00:00
factory . registerStorage ( name , [ ] ( const StorageFactory : : Arguments & args )
2019-06-17 00:06:14 +00:00
{
ASTs & engine_args = args . engine_args ;
2019-11-04 19:57:03 +00:00
if ( engine_args . size ( ) < 2 | | engine_args . size ( ) > 5 )
2019-06-17 00:06:14 +00:00
throw Exception (
2019-11-04 19:57:03 +00:00
" Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method]. " , ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
2019-06-17 00:06:14 +00:00
2020-03-09 01:22:33 +00:00
for ( auto & engine_arg : engine_args )
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_arg , args . local_context ) ;
2019-06-17 00:06:14 +00:00
String url = engine_args [ 0 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2019-12-06 14:37:21 +00:00
Poco : : URI uri ( url ) ;
S3 : : URI s3_uri ( uri ) ;
2019-06-17 00:06:14 +00:00
2019-11-04 19:57:03 +00:00
String access_key_id ;
String secret_access_key ;
if ( engine_args . size ( ) > = 4 )
{
access_key_id = engine_args [ 1 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
secret_access_key = engine_args [ 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
}
2019-06-17 00:06:14 +00:00
2019-09-23 12:41:59 +00:00
UInt64 min_upload_part_size = args . local_context . getSettingsRef ( ) . s3_min_upload_part_size ;
2020-12-09 14:09:04 +00:00
UInt64 max_single_part_upload_size = args . local_context . getSettingsRef ( ) . s3_max_single_part_upload_size ;
2019-09-23 12:41:59 +00:00
2019-11-19 12:46:07 +00:00
String compression_method ;
2020-09-28 07:37:24 +00:00
String format_name ;
2019-11-04 19:57:03 +00:00
if ( engine_args . size ( ) = = 3 | | engine_args . size ( ) = = 5 )
2020-09-28 07:37:24 +00:00
{
2019-11-04 19:57:03 +00:00
compression_method = engine_args . back ( ) - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
2020-09-28 07:37:24 +00:00
format_name = engine_args [ engine_args . size ( ) - 2 ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
}
2019-11-04 19:57:03 +00:00
else
2020-09-28 07:37:24 +00:00
{
2019-11-04 19:57:03 +00:00
compression_method = " auto " ;
2020-09-28 07:37:24 +00:00
format_name = engine_args . back ( ) - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
}
2019-11-19 12:46:07 +00:00
2020-09-30 12:04:21 +00:00
return StorageS3 : : create (
s3_uri ,
access_key_id ,
secret_access_key ,
args . table_id ,
format_name ,
min_upload_part_size ,
2020-12-09 14:09:04 +00:00
max_single_part_upload_size ,
2020-09-30 12:04:21 +00:00
args . columns ,
args . constraints ,
args . context ,
compression_method
) ;
2020-04-06 05:19:40 +00:00
} ,
{
. source_access_type = AccessType : : S3 ,
2019-06-17 00:06:14 +00:00
} ) ;
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
2020-07-10 07:26:55 +00:00
void registerStorageS3 ( StorageFactory & factory )
{
2020-07-13 14:13:30 +00:00
return registerStorageS3Impl ( " S3 " , factory ) ;
2020-07-10 07:26:55 +00:00
}
void registerStorageCOS ( StorageFactory & factory )
{
2020-07-13 14:13:30 +00:00
return registerStorageS3Impl ( " COSN " , factory ) ;
2020-07-10 07:26:55 +00:00
}
2020-04-28 10:38:57 +00:00
NamesAndTypesList StorageS3 : : getVirtuals ( ) const
2020-04-27 13:55:30 +00:00
{
2020-04-28 10:38:57 +00:00
return NamesAndTypesList {
2020-04-27 13:55:30 +00:00
{ " _path " , std : : make_shared < DataTypeString > ( ) } ,
{ " _file " , std : : make_shared < DataTypeString > ( ) }
} ;
}
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
2019-12-09 12:36:06 +00:00
# endif