ClickHouse/src/Storages/StorageS3.h

228 lines
6.7 KiB
C++
Raw Normal View History

2019-05-23 09:03:39 +00:00
#pragma once
2019-12-06 14:37:21 +00:00
#include <Common/config.h>
#if USE_AWS_S3
2021-03-22 17:12:31 +00:00
#include <Core/Types.h>
#include <Compression/CompressionInfo.h>
2019-05-23 09:03:39 +00:00
#include <Storages/IStorage.h>
#include <Storages/StorageS3Settings.h>
2021-03-22 17:12:31 +00:00
#include <Processors/Sources/SourceWithProgress.h>
2019-05-29 12:54:31 +00:00
#include <Poco/URI.h>
2021-10-02 07:13:14 +00:00
#include <base/logger_useful.h>
#include <base/shared_ptr_helper.h>
2021-03-16 18:41:29 +00:00
#include <IO/S3Common.h>
2021-03-22 17:12:31 +00:00
#include <IO/CompressionMethod.h>
2021-04-08 00:09:15 +00:00
#include <Interpreters/Context.h>
2021-09-07 11:17:25 +00:00
#include <Storages/ExternalDataSourceConfiguration.h>
2019-12-11 14:21:48 +00:00
namespace Aws::S3
{
class S3Client;
}
2019-09-22 22:13:42 +00:00
2019-05-23 09:03:39 +00:00
namespace DB
{
2019-12-03 16:23:24 +00:00
2021-07-20 18:18:43 +00:00
class PullingPipelineExecutor;
2021-03-22 17:12:31 +00:00
class StorageS3SequentialSource;
2021-04-12 19:35:26 +00:00
class StorageS3Source : public SourceWithProgress, WithContext
2021-03-22 17:12:31 +00:00
{
public:
2021-04-06 19:18:45 +00:00
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(Aws::S3::S3Client &, const S3::URI &);
2021-04-13 10:59:02 +00:00
String next();
2021-04-06 19:18:45 +00:00
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
class KeysIterator
{
public:
explicit KeysIterator(const std::vector<String> & keys_);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
2021-04-13 10:59:02 +00:00
using IteratorWrapper = std::function<String()>;
2021-04-08 00:09:15 +00:00
2021-03-22 17:12:31 +00:00
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
StorageS3Source(
bool need_path,
bool need_file,
const String & format,
String name_,
const Block & sample_block,
2021-04-12 19:35:26 +00:00
ContextPtr context_,
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings_,
2021-04-08 00:09:15 +00:00
const ColumnsDescription & columns_,
UInt64 max_block_size_,
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries_,
String compression_hint_,
2021-04-08 00:09:15 +00:00
const std::shared_ptr<Aws::S3::S3Client> & client_,
2021-03-22 17:12:31 +00:00
const String & bucket,
2021-04-10 02:21:18 +00:00
std::shared_ptr<IteratorWrapper> file_iterator_);
2021-03-22 17:12:31 +00:00
String getName() const override;
Chunk generate() override;
2021-12-27 19:42:56 +00:00
void onCancel() override;
2021-03-22 17:12:31 +00:00
private:
String name;
2021-04-08 00:09:15 +00:00
String bucket;
String file_path;
String format;
ColumnsDescription columns_desc;
UInt64 max_block_size;
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries;
2021-04-08 00:09:15 +00:00
String compression_hint;
std::shared_ptr<Aws::S3::S3Client> client;
Block sample_block;
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings;
2021-04-08 00:09:15 +00:00
2021-03-22 17:12:31 +00:00
std::unique_ptr<ReadBuffer> read_buf;
2021-07-20 18:18:43 +00:00
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancel and generate can be called concurrently
std::mutex reader_mutex;
2021-03-22 17:12:31 +00:00
bool initialized = false;
bool with_file_column = false;
bool with_path_column = false;
2021-04-10 02:21:18 +00:00
std::shared_ptr<IteratorWrapper> file_iterator;
2021-04-08 00:09:15 +00:00
/// Recreate ReadBuffer and BlockInputStream for each file.
bool initialize();
2021-03-22 17:12:31 +00:00
};
2019-05-31 07:27:14 +00:00
/**
2019-06-01 21:18:20 +00:00
* This class represents table engine for external S3 urls.
2019-05-31 07:27:14 +00:00
* It sends HTTP GET to server when select is called and
2019-06-01 21:18:20 +00:00
* HTTP PUT when insert is called.
2019-05-31 07:27:14 +00:00
*/
2021-06-15 19:55:21 +00:00
class StorageS3 : public shared_ptr_helper<StorageS3>, public IStorage, WithContext
2019-05-23 09:03:39 +00:00
{
public:
2021-04-23 12:18:23 +00:00
StorageS3(
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
2019-06-01 21:18:20 +00:00
const String & format_name_,
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 upload_part_size_multiply_factor_,
UInt64 upload_part_size_multiply_parts_count_threshold_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
2019-06-01 21:18:20 +00:00
const ColumnsDescription & columns_,
2019-09-22 22:13:42 +00:00
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
ContextPtr context_,
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings_,
2021-04-13 20:17:25 +00:00
const String & compression_method_ = "",
2021-10-26 12:22:13 +00:00
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
2019-06-01 21:18:20 +00:00
String getName() const override
{
return name;
2019-06-01 21:18:20 +00:00
}
2020-08-03 13:54:14 +00:00
Pipe read(
2019-09-22 22:13:42 +00:00
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
2019-05-23 09:03:39 +00:00
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
2021-07-23 19:33:59 +00:00
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
2019-05-23 09:03:39 +00:00
2021-06-21 15:44:36 +00:00
void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override;
NamesAndTypesList getVirtuals() const override;
2020-04-27 13:55:30 +00:00
2021-07-14 08:49:05 +00:00
bool supportsPartitionBy() const override;
2021-09-07 11:17:25 +00:00
static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
static ColumnsDescription getTableStructureFromData(
const String & format,
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
UInt64 max_connections,
UInt64 max_single_read_retries,
const String & compression_method,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
2021-03-22 17:12:31 +00:00
private:
2021-04-12 21:42:52 +00:00
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
2021-03-25 13:49:07 +00:00
2021-06-28 17:02:22 +00:00
struct ClientAuthentication
2021-03-16 18:41:29 +00:00
{
const S3::URI uri;
const String access_key_id;
const String secret_access_key;
const UInt64 max_connections;
std::shared_ptr<Aws::S3::S3Client> client;
S3AuthSettings auth_settings;
};
2021-06-28 17:02:22 +00:00
ClientAuthentication client_auth;
std::vector<String> keys;
2019-05-23 09:03:39 +00:00
2019-05-31 07:27:14 +00:00
String format_name;
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries;
size_t min_upload_part_size;
size_t upload_part_size_multiply_factor;
size_t upload_part_size_multiply_parts_count_threshold;
size_t max_single_part_upload_size;
String compression_method;
String name;
2021-04-13 20:17:25 +00:00
const bool distributed_processing;
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings;
2021-10-26 12:22:13 +00:00
ASTPtr partition_by;
bool is_key_with_globs = false;
2021-06-28 17:02:22 +00:00
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &);
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context);
static ColumnsDescription getTableStructureFromDataImpl(
const String & format,
const ClientAuthentication & client_auth,
UInt64 max_single_read_retries,
const String & compression_method,
bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
2022-02-23 19:31:16 +00:00
2022-02-28 13:29:05 +00:00
bool isColumnOriented() const override;
2019-05-23 09:03:39 +00:00
};
2019-12-11 14:21:48 +00:00
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
#endif