ClickHouse/src/Storages/StorageS3.h

183 lines
4.7 KiB
C++
Raw Normal View History

2019-05-23 09:03:39 +00:00
#pragma once
2019-12-06 14:37:21 +00:00
#include <Common/config.h>
#if USE_AWS_S3
2021-03-22 17:12:31 +00:00
#include <Core/Types.h>
#include <Compression/CompressionInfo.h>
2019-05-23 09:03:39 +00:00
#include <Storages/IStorage.h>
#include <Storages/StorageS3Settings.h>
2021-03-22 17:12:31 +00:00
#include <Processors/Sources/SourceWithProgress.h>
2019-05-29 12:54:31 +00:00
#include <Poco/URI.h>
2019-05-23 09:03:39 +00:00
#include <common/logger_useful.h>
#include <ext/shared_ptr_helper.h>
2021-03-16 18:41:29 +00:00
#include <IO/S3Common.h>
2021-03-22 17:12:31 +00:00
#include <IO/CompressionMethod.h>
2021-04-08 00:09:15 +00:00
#include <Interpreters/Context.h>
2019-12-11 14:21:48 +00:00
namespace Aws::S3
{
class S3Client;
}
2019-09-22 22:13:42 +00:00
2019-05-23 09:03:39 +00:00
namespace DB
{
2019-12-03 16:23:24 +00:00
2021-03-22 17:12:31 +00:00
class StorageS3SequentialSource;
class StorageS3Source : public SourceWithProgress
{
public:
2021-04-06 19:18:45 +00:00
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(Aws::S3::S3Client &, const S3::URI &);
std::optional<String> next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
2021-04-08 00:09:15 +00:00
struct FileIterator
{
virtual ~FileIterator() = default;
virtual std::optional<String> next() = 0;
};
2021-03-22 17:12:31 +00:00
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
StorageS3Source(
bool need_path,
bool need_file,
const String & format,
String name_,
const Block & sample_block,
2021-04-08 00:09:15 +00:00
const Context & context_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
2021-03-22 17:12:31 +00:00
const String & bucket,
2021-04-08 00:09:15 +00:00
std::shared_ptr<FileIterator> file_iterator_);
2021-03-22 17:12:31 +00:00
String getName() const override;
Chunk generate() override;
private:
String name;
2021-04-08 00:09:15 +00:00
String bucket;
String file_path;
String format;
Context context;
ColumnsDescription columns_desc;
UInt64 max_block_size;
String compression_hint;
std::shared_ptr<Aws::S3::S3Client> client;
Block sample_block;
2021-03-22 17:12:31 +00:00
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
bool initialized = false;
bool with_file_column = false;
bool with_path_column = false;
2021-04-08 00:09:15 +00:00
std::shared_ptr<FileIterator> file_iterator;
/// Recreate ReadBuffer and BlockInputStream for each file.
bool initialize();
2021-03-22 17:12:31 +00:00
};
2019-05-31 07:27:14 +00:00
/**
2019-06-01 21:18:20 +00:00
* This class represents table engine for external S3 urls.
2019-05-31 07:27:14 +00:00
* It sends HTTP GET to server when select is called and
2019-06-01 21:18:20 +00:00
* HTTP PUT when insert is called.
2019-05-31 07:27:14 +00:00
*/
class StorageS3 : public ext::shared_ptr_helper<StorageS3>, public IStorage, WithContext
2019-05-23 09:03:39 +00:00
{
public:
2019-12-06 14:37:21 +00:00
StorageS3(const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
2019-06-01 21:18:20 +00:00
const String & format_name_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
2019-06-01 21:18:20 +00:00
const ColumnsDescription & columns_,
2019-09-22 22:13:42 +00:00
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_ = "");
2019-06-01 21:18:20 +00:00
String getName() const override
{
return name;
2019-06-01 21:18:20 +00:00
}
2020-08-03 13:54:14 +00:00
Pipe read(
2019-09-22 22:13:42 +00:00
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context,
2019-05-23 09:03:39 +00:00
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
2019-05-23 09:03:39 +00:00
NamesAndTypesList getVirtuals() const override;
2020-04-27 13:55:30 +00:00
2019-09-22 22:13:42 +00:00
private:
2021-03-22 17:12:31 +00:00
friend class StorageS3Distributed;
friend class TableFunctionS3Distributed;
friend class StorageS3SequentialSource;
2021-03-25 13:49:07 +00:00
2021-03-16 18:41:29 +00:00
struct ClientAuthentificaiton
{
const S3::URI uri;
const String access_key_id;
const String secret_access_key;
const UInt64 max_connections;
std::shared_ptr<Aws::S3::S3Client> client;
S3AuthSettings auth_settings;
};
ClientAuthentificaiton client_auth;
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
String format_name;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
String compression_method;
String name;
2021-04-08 00:09:15 +00:00
struct LocalFileIterator : public StorageS3Source::FileIterator
{
explicit LocalFileIterator(StorageS3Source::DisclosedGlobIterator glob_iterator_)
: glob_iterator(glob_iterator_) {}
StorageS3Source::DisclosedGlobIterator glob_iterator;
/// Several files could be processed in parallel
/// from different sources
std::mutex iterator_mutex;
std::optional<String> next() override
{
std::lock_guard lock(iterator_mutex);
return glob_iterator.next();
}
};
2021-03-16 18:41:29 +00:00
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentificaiton &);
2019-05-23 09:03:39 +00:00
};
2019-12-11 14:21:48 +00:00
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
#endif