#pragma once #include #if USE_AWS_S3 #include #include #include #include #include #include #include #include #include #include #include namespace Aws::S3 { class S3Client; } namespace DB { class StorageS3SequentialSource; class StorageS3Source : public SourceWithProgress, WithContext { public: class DisclosedGlobIterator { public: DisclosedGlobIterator(Aws::S3::S3Client &, const S3::URI &); String next(); private: class Impl; /// shared_ptr to have copy constructor std::shared_ptr pimpl; }; using IteratorWrapper = std::function; static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column); StorageS3Source( bool need_path, bool need_file, const String & format, String name_, const Block & sample_block, ContextPtr context_, const ColumnsDescription & columns_, UInt64 max_block_size_, Int64 s3_max_single_read_retries_, const String compression_hint_, const std::shared_ptr & client_, const String & bucket, std::shared_ptr file_iterator_); String getName() const override; Chunk generate() override; private: String name; String bucket; String file_path; String format; ColumnsDescription columns_desc; UInt64 max_block_size; Int64 s3_max_single_read_retries; String compression_hint; std::shared_ptr client; Block sample_block; std::unique_ptr read_buf; BlockInputStreamPtr reader; bool initialized = false; bool with_file_column = false; bool with_path_column = false; std::shared_ptr file_iterator; /// Recreate ReadBuffer and BlockInputStream for each file. bool initialize(); }; /** * This class represents table engine for external S3 urls. * It sends HTTP GET to server when select is called and * HTTP PUT when insert is called. */ class StorageS3 : public ext::shared_ptr_helper, public IStorage, WithContext { public: StorageS3(const S3::URI & uri, const String & access_key_id, const String & secret_access_key, const StorageID & table_id_, const String & format_name_, Int64 s3_max_single_read_retries_, UInt64 min_upload_part_size_, UInt64 max_single_part_upload_size_, UInt64 max_connections_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, ContextPtr context_, const String & compression_method_ = "", bool distributed_processing_ = false); String getName() const override { return name; } Pipe read( const Names & column_names, const StorageMetadataPtr & /*metadata_snapshot*/, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; NamesAndTypesList getVirtuals() const override; private: friend class StorageS3Cluster; friend class TableFunctionS3Cluster; struct ClientAuthentificaiton { const S3::URI uri; const String access_key_id; const String secret_access_key; const UInt64 max_connections; std::shared_ptr client; S3AuthSettings auth_settings; }; ClientAuthentificaiton client_auth; String format_name; Int64 s3_max_single_read_retries; size_t min_upload_part_size; size_t max_single_part_upload_size; String compression_method; String name; const bool distributed_processing; static void updateClientAndAuthSettings(ContextPtr, ClientAuthentificaiton &); }; } #endif