ClickHouse/src/Storages/StorageS3.h

166 lines
4.3 KiB
C++
Raw Normal View History

2019-05-23 09:03:39 +00:00
#pragma once
2019-12-06 14:37:21 +00:00
#include <Common/config.h>
#if USE_AWS_S3
2021-03-22 17:12:31 +00:00
#include <Core/Types.h>
#include <Compression/CompressionInfo.h>
2019-05-23 09:03:39 +00:00
#include <Storages/IStorage.h>
#include <Storages/StorageS3Settings.h>
2021-03-22 17:12:31 +00:00
#include <Processors/Sources/SourceWithProgress.h>
2019-05-29 12:54:31 +00:00
#include <Poco/URI.h>
2019-05-23 09:03:39 +00:00
#include <common/logger_useful.h>
#include <ext/shared_ptr_helper.h>
2021-03-16 18:41:29 +00:00
#include <IO/S3Common.h>
2021-03-22 17:12:31 +00:00
#include <IO/CompressionMethod.h>
2021-04-08 00:09:15 +00:00
#include <Interpreters/Context.h>
2019-12-11 14:21:48 +00:00
namespace Aws::S3
{
class S3Client;
}
2019-09-22 22:13:42 +00:00
2019-05-23 09:03:39 +00:00
namespace DB
{
2019-12-03 16:23:24 +00:00
2021-03-22 17:12:31 +00:00
class StorageS3SequentialSource;
2021-04-12 19:35:26 +00:00
class StorageS3Source : public SourceWithProgress, WithContext
2021-03-22 17:12:31 +00:00
{
public:
2021-04-06 19:18:45 +00:00
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(Aws::S3::S3Client &, const S3::URI &);
2021-04-13 10:59:02 +00:00
String next();
2021-04-06 19:18:45 +00:00
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
2021-04-13 10:59:02 +00:00
using IteratorWrapper = std::function<String()>;
2021-04-08 00:09:15 +00:00
2021-03-22 17:12:31 +00:00
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
StorageS3Source(
bool need_path,
bool need_file,
const String & format,
String name_,
const Block & sample_block,
2021-04-12 19:35:26 +00:00
ContextPtr context_,
2021-04-08 00:09:15 +00:00
const ColumnsDescription & columns_,
UInt64 max_block_size_,
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries_,
2021-06-01 14:23:46 +00:00
const String compression_hint_,
2021-04-08 00:09:15 +00:00
const std::shared_ptr<Aws::S3::S3Client> & client_,
2021-03-22 17:12:31 +00:00
const String & bucket,
2021-04-10 02:21:18 +00:00
std::shared_ptr<IteratorWrapper> file_iterator_);
2021-03-22 17:12:31 +00:00
String getName() const override;
Chunk generate() override;
private:
String name;
2021-04-08 00:09:15 +00:00
String bucket;
String file_path;
String format;
ColumnsDescription columns_desc;
UInt64 max_block_size;
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries;
2021-04-08 00:09:15 +00:00
String compression_hint;
std::shared_ptr<Aws::S3::S3Client> client;
Block sample_block;
2021-03-22 17:12:31 +00:00
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
bool initialized = false;
bool with_file_column = false;
bool with_path_column = false;
2021-04-10 02:21:18 +00:00
std::shared_ptr<IteratorWrapper> file_iterator;
2021-04-08 00:09:15 +00:00
/// Recreate ReadBuffer and BlockInputStream for each file.
bool initialize();
2021-03-22 17:12:31 +00:00
};
2019-05-31 07:27:14 +00:00
/**
2019-06-01 21:18:20 +00:00
* This class represents table engine for external S3 urls.
2019-05-31 07:27:14 +00:00
* It sends HTTP GET to server when select is called and
2019-06-01 21:18:20 +00:00
* HTTP PUT when insert is called.
2019-05-31 07:27:14 +00:00
*/
class StorageS3 : public ext::shared_ptr_helper<StorageS3>, public IStorage, WithContext
2019-05-23 09:03:39 +00:00
{
public:
2021-04-23 12:18:23 +00:00
StorageS3(
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
2019-06-01 21:18:20 +00:00
const String & format_name_,
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
2019-06-01 21:18:20 +00:00
const ColumnsDescription & columns_,
2019-09-22 22:13:42 +00:00
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
ContextPtr context_,
2021-04-13 20:17:25 +00:00
const String & compression_method_ = "",
bool distributed_processing_ = false);
2019-06-01 21:18:20 +00:00
String getName() const override
{
return name;
2019-06-01 21:18:20 +00:00
}
2020-08-03 13:54:14 +00:00
Pipe read(
2019-09-22 22:13:42 +00:00
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context,
2019-05-23 09:03:39 +00:00
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
2019-05-23 09:03:39 +00:00
NamesAndTypesList getVirtuals() const override;
2020-04-27 13:55:30 +00:00
2019-09-22 22:13:42 +00:00
private:
2021-03-22 17:12:31 +00:00
2021-04-12 21:42:52 +00:00
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
2021-03-25 13:49:07 +00:00
2021-03-16 18:41:29 +00:00
struct ClientAuthentificaiton
{
const S3::URI uri;
const String access_key_id;
const String secret_access_key;
const UInt64 max_connections;
std::shared_ptr<Aws::S3::S3Client> client;
S3AuthSettings auth_settings;
};
ClientAuthentificaiton client_auth;
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
String format_name;
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
String compression_method;
String name;
2021-04-13 20:17:25 +00:00
const bool distributed_processing;
2021-03-16 18:41:29 +00:00
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentificaiton &);
2019-05-23 09:03:39 +00:00
};
2019-12-11 14:21:48 +00:00
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
#endif