mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge pull request #48724 from johanngan/sse-kms
Support SSE-KMS configuration with S3 client
This commit is contained in:
commit
a68a023ca7
@ -155,6 +155,9 @@ The following settings can be specified in configuration file for given endpoint
|
||||
- `no_sign_request` - Ignore all the credentials so requests are not signed. Useful for accessing public buckets.
|
||||
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be specified multiple times.
|
||||
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set. Optional.
|
||||
- `server_side_encryption_kms_key_id` - If specified, required headers for accessing S3 objects with [SSE-KMS encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingKMSEncryption.html) will be set. If an empty string is specified, the AWS managed S3 key will be used. Optional.
|
||||
- `server_side_encryption_kms_encryption_context` - If specified alongside `server_side_encryption_kms_key_id`, the given encryption context header for SSE-KMS will be set. Optional.
|
||||
- `server_side_encryption_kms_bucket_key_enabled` - If specified alongside `server_side_encryption_kms_key_id`, the header to enable S3 bucket keys for SSE-KMS will be set. Optional, can be `true` or `false`, defaults to nothing (matches the bucket-level setting).
|
||||
- `max_single_read_retries` — The maximum number of attempts during single read. Default value is `4`. Optional.
|
||||
- `max_put_rps`, `max_put_burst`, `max_get_rps` and `max_get_burst` - Throttling settings (see description above) to use for specific endpoint instead of per query. Optional.
|
||||
|
||||
@ -173,6 +176,9 @@ The following settings can be specified in configuration file for given endpoint
|
||||
<!-- <no_sign_request>false</no_sign_request> -->
|
||||
<!-- <header>Authorization: Bearer SOME-TOKEN</header> -->
|
||||
<!-- <server_side_encryption_customer_key_base64>BASE64-ENCODED-KEY</server_side_encryption_customer_key_base64> -->
|
||||
<!-- <server_side_encryption_kms_key_id>KMS_KEY_ID</server_side_encryption_kms_key_id> -->
|
||||
<!-- <server_side_encryption_kms_encryption_context>KMS_ENCRYPTION_CONTEXT</server_side_encryption_kms_encryption_context> -->
|
||||
<!-- <server_side_encryption_kms_bucket_key_enabled>true</server_side_encryption_kms_bucket_key_enabled> -->
|
||||
<!-- <max_single_read_retries>4</max_single_read_retries> -->
|
||||
</endpoint-name>
|
||||
</s3>
|
||||
|
@ -1055,7 +1055,11 @@ Configuration markup:
|
||||
<access_key_id>your_access_key_id</access_key_id>
|
||||
<secret_access_key>your_secret_access_key</secret_access_key>
|
||||
<region></region>
|
||||
<header>Authorization: Bearer SOME-TOKEN</header>
|
||||
<server_side_encryption_customer_key_base64>your_base64_encoded_customer_key</server_side_encryption_customer_key_base64>
|
||||
<server_side_encryption_kms_key_id>your_kms_key_id</server_side_encryption_kms_key_id>
|
||||
<server_side_encryption_kms_encryption_context>your_kms_encryption_context</server_side_encryption_kms_encryption_context>
|
||||
<server_side_encryption_kms_bucket_key_enabled>true</server_side_encryption_kms_bucket_key_enabled>
|
||||
<proxy>
|
||||
<uri>http://proxy1</uri>
|
||||
<uri>http://proxy2</uri>
|
||||
@ -1106,7 +1110,11 @@ Optional parameters:
|
||||
- `min_bytes_for_seek` — Minimal number of bytes to use seek operation instead of sequential read. Default value is `1 Mb`.
|
||||
- `metadata_path` — Path on local FS to store metadata files for S3. Default value is `/var/lib/clickhouse/disks/<disk_name>/`.
|
||||
- `skip_access_check` — If true, disk access checks will not be performed on disk start-up. Default value is `false`.
|
||||
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be specified multiple times.
|
||||
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set.
|
||||
- `server_side_encryption_kms_key_id` - If specified, required headers for accessing S3 objects with [SSE-KMS encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingKMSEncryption.html) will be set. If an empty string is specified, the AWS managed S3 key will be used. Optional.
|
||||
- `server_side_encryption_kms_encryption_context` - If specified alongside `server_side_encryption_kms_key_id`, the given encryption context header for SSE-KMS will be set. Optional.
|
||||
- `server_side_encryption_kms_bucket_key_enabled` - If specified alongside `server_side_encryption_kms_key_id`, the header to enable S3 bucket keys for SSE-KMS will be set. Optional, can be `true` or `false`, defaults to nothing (matches the bucket-level setting).
|
||||
- `s3_max_put_rps` — Maximum PUT requests per second rate before throttling. Default value is `0` (unlimited).
|
||||
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
|
||||
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
|
||||
|
@ -135,7 +135,7 @@ func TestConfigFileFrameCopy(t *testing.T) {
|
||||
sizes := map[string]int64{
|
||||
"users.xml": int64(2017),
|
||||
"default-password.xml": int64(188),
|
||||
"config.xml": int64(61260),
|
||||
"config.xml": int64(61662),
|
||||
"server-include.xml": int64(168),
|
||||
"user-include.xml": int64(559),
|
||||
}
|
||||
|
@ -1260,8 +1260,12 @@
|
||||
<access_key_id>REPLACE_ME</access_key_id>
|
||||
<secret_access_key>REPLACE_ME</secret_access_key>
|
||||
<region></region>
|
||||
<header>Authorization: Bearer SOME-TOKEN</header>
|
||||
<server_side_encryption_customer_key_base64>your_base64_encoded_customer_key
|
||||
</server_side_encryption_customer_key_base64>
|
||||
<server_side_encryption_kms_key_id>REPLACE_ME</server_side_encryption_kms_key_id>
|
||||
<server_side_encryption_kms_encryption_context>REPLACE_ME</server_side_encryption_kms_encryption_context>
|
||||
<server_side_encryption_kms_bucket_key_enabled>true</server_side_encryption_kms_bucket_key_enabled>
|
||||
<proxy>
|
||||
<uri>http://proxy1</uri>
|
||||
<uri>http://proxy2</uri>
|
||||
|
@ -65,6 +65,7 @@ namespace
|
||||
credentials.GetAWSAccessKeyId(),
|
||||
credentials.GetAWSSecretKey(),
|
||||
settings.auth_settings.server_side_encryption_customer_key_base64,
|
||||
settings.auth_settings.server_side_encryption_kms_config,
|
||||
std::move(headers),
|
||||
S3::CredentialsConfiguration
|
||||
{
|
||||
|
@ -102,6 +102,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
|
||||
credentials.GetAWSAccessKeyId(),
|
||||
credentials.GetAWSSecretKey(),
|
||||
auth_settings.server_side_encryption_customer_key_base64,
|
||||
auth_settings.server_side_encryption_kms_config,
|
||||
std::move(headers),
|
||||
S3::CredentialsConfiguration
|
||||
{
|
||||
|
@ -142,6 +142,9 @@ std::unique_ptr<S3::Client> getClient(
|
||||
= [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); };
|
||||
}
|
||||
|
||||
HTTPHeaderEntries headers = S3::getHTTPHeaders(config_prefix, config);
|
||||
S3::ServerSideEncryptionKMSConfig sse_kms_config = S3::getSSEKMSConfig(config_prefix, config);
|
||||
|
||||
client_configuration.retryStrategy
|
||||
= std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));
|
||||
|
||||
@ -151,7 +154,8 @@ std::unique_ptr<S3::Client> getClient(
|
||||
config.getString(config_prefix + ".access_key_id", ""),
|
||||
config.getString(config_prefix + ".secret_access_key", ""),
|
||||
config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""),
|
||||
{},
|
||||
std::move(sse_kms_config),
|
||||
std::move(headers),
|
||||
S3::CredentialsConfiguration
|
||||
{
|
||||
config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", true)),
|
||||
|
@ -174,6 +174,6 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
|
||||
|
||||
#else
|
||||
|
||||
void registerDiskS3(DiskFactory &, bool /* global_skip_access_check */) {}
|
||||
void registerDiskS3(DB::DiskFactory &, bool /* global_skip_access_check */) {}
|
||||
|
||||
#endif
|
||||
|
@ -96,6 +96,7 @@ void verifyClientConfiguration(const Aws::Client::ClientConfiguration & client_c
|
||||
|
||||
std::unique_ptr<Client> Client::create(
|
||||
size_t max_redirects_,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config_,
|
||||
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
|
||||
const Aws::Client::ClientConfiguration & client_configuration,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
@ -103,7 +104,7 @@ std::unique_ptr<Client> Client::create(
|
||||
{
|
||||
verifyClientConfiguration(client_configuration);
|
||||
return std::unique_ptr<Client>(
|
||||
new Client(max_redirects_, credentials_provider, client_configuration, sign_payloads, use_virtual_addressing));
|
||||
new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, use_virtual_addressing));
|
||||
}
|
||||
|
||||
std::unique_ptr<Client> Client::create(const Client & other)
|
||||
@ -113,12 +114,14 @@ std::unique_ptr<Client> Client::create(const Client & other)
|
||||
|
||||
Client::Client(
|
||||
size_t max_redirects_,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config_,
|
||||
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
|
||||
const Aws::Client::ClientConfiguration & client_configuration,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
bool use_virtual_addressing)
|
||||
: Aws::S3::S3Client(credentials_provider, client_configuration, std::move(sign_payloads), use_virtual_addressing)
|
||||
, max_redirects(max_redirects_)
|
||||
, sse_kms_config(std::move(sse_kms_config_))
|
||||
, log(&Poco::Logger::get("S3Client"))
|
||||
{
|
||||
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
|
||||
@ -141,6 +144,7 @@ Client::Client(const Client & other)
|
||||
, detect_region(other.detect_region)
|
||||
, provider_type(other.provider_type)
|
||||
, max_redirects(other.max_redirects)
|
||||
, sse_kms_config(other.sse_kms_config)
|
||||
, log(&Poco::Logger::get("S3Client"))
|
||||
{
|
||||
cache = std::make_shared<ClientCache>(*other.cache);
|
||||
@ -178,6 +182,28 @@ void Client::insertRegionOverride(const std::string & bucket, const std::string
|
||||
LOG_INFO(log, "Detected different region ('{}') for bucket {} than the one defined ('{}')", region, bucket, explicit_region);
|
||||
}
|
||||
|
||||
template <typename RequestType>
|
||||
void Client::setKMSHeaders(RequestType & request) const
|
||||
{
|
||||
// Don't do anything unless a key ID was specified
|
||||
if (sse_kms_config.key_id)
|
||||
{
|
||||
request.SetServerSideEncryption(Model::ServerSideEncryption::aws_kms);
|
||||
// If the key ID was specified but is empty, treat it as using the AWS managed key and omit the header
|
||||
if (!sse_kms_config.key_id->empty())
|
||||
request.SetSSEKMSKeyId(*sse_kms_config.key_id);
|
||||
if (sse_kms_config.encryption_context)
|
||||
request.SetSSEKMSEncryptionContext(*sse_kms_config.encryption_context);
|
||||
if (sse_kms_config.bucket_key_enabled)
|
||||
request.SetBucketKeyEnabled(*sse_kms_config.bucket_key_enabled);
|
||||
}
|
||||
}
|
||||
|
||||
// Explicitly instantiate this method only for the request types that support KMS headers
|
||||
template void Client::setKMSHeaders<CreateMultipartUploadRequest>(CreateMultipartUploadRequest & request) const;
|
||||
template void Client::setKMSHeaders<CopyObjectRequest>(CopyObjectRequest & request) const;
|
||||
template void Client::setKMSHeaders<PutObjectRequest>(PutObjectRequest & request) const;
|
||||
|
||||
Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) const
|
||||
{
|
||||
const auto & bucket = request.GetBucket();
|
||||
@ -574,6 +600,7 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const String & server_side_encryption_customer_key_base64,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config,
|
||||
HTTPHeaderEntries headers,
|
||||
CredentialsConfiguration credentials_configuration)
|
||||
{
|
||||
@ -596,6 +623,7 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
|
||||
Aws::Utils::HashingUtils::Base64Encode(Aws::Utils::HashingUtils::CalculateMD5(str_buffer))});
|
||||
}
|
||||
|
||||
// These will be added after request signing
|
||||
client_configuration.extra_headers = std::move(headers);
|
||||
|
||||
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
|
||||
@ -607,6 +635,7 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
|
||||
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(std::move(client_configuration.retryStrategy));
|
||||
return Client::create(
|
||||
client_configuration.s3_max_redirects,
|
||||
std::move(sse_kms_config),
|
||||
credentials_provider,
|
||||
client_configuration, // Client configuration.
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
|
||||
|
@ -1,7 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
#include <base/types.h>
|
||||
|
||||
#include "config.h"
|
||||
|
||||
namespace DB::S3
|
||||
{
|
||||
|
||||
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/specifying-kms-encryption.html
|
||||
/// Needed by S3Common.h even if USE_AWS_S3 is 0
|
||||
struct ServerSideEncryptionKMSConfig
|
||||
{
|
||||
// If key_id is non-null, enable SSE-KMS. If key_id is "", use the AWS managed key
|
||||
std::optional<String> key_id = std::nullopt;
|
||||
std::optional<String> encryption_context = std::nullopt;
|
||||
std::optional<bool> bucket_key_enabled = std::nullopt;
|
||||
|
||||
bool operator==(const ServerSideEncryptionKMSConfig & other) const = default;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Common/assert_cast.h>
|
||||
@ -85,6 +105,7 @@ public:
|
||||
/// e.g. Client::RetryStrategy should be used
|
||||
static std::unique_ptr<Client> create(
|
||||
size_t max_redirects_,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config_,
|
||||
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
|
||||
const Aws::Client::ClientConfiguration & client_configuration,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
@ -143,6 +164,13 @@ public:
|
||||
std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy;
|
||||
};
|
||||
|
||||
/// SSE-KMS headers MUST be signed, so they need to be added before the SDK signs the message
|
||||
/// (before sending the request with one of the methods below).
|
||||
/// Per the docs (https://docs.aws.amazon.com/AmazonS3/latest/userguide/specifying-kms-encryption.html),
|
||||
/// the headers should only be set for PutObject, CopyObject, POST Object, and CreateMultipartUpload.
|
||||
template <typename RequestType>
|
||||
void setKMSHeaders(RequestType & request) const;
|
||||
|
||||
Model::HeadObjectOutcome HeadObject(const HeadObjectRequest & request) const;
|
||||
Model::ListObjectsV2Outcome ListObjectsV2(const ListObjectsV2Request & request) const;
|
||||
Model::ListObjectsOutcome ListObjects(const ListObjectsRequest & request) const;
|
||||
@ -165,6 +193,7 @@ public:
|
||||
ProviderType getProviderType() const;
|
||||
private:
|
||||
Client(size_t max_redirects_,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config_,
|
||||
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider>& credentials_provider,
|
||||
const Aws::Client::ClientConfiguration& client_configuration,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
|
||||
@ -215,6 +244,8 @@ private:
|
||||
|
||||
const size_t max_redirects;
|
||||
|
||||
const ServerSideEncryptionKMSConfig sse_kms_config;
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
@ -231,6 +262,7 @@ public:
|
||||
const String & access_key_id,
|
||||
const String & secret_access_key,
|
||||
const String & server_side_encryption_customer_key_base64,
|
||||
ServerSideEncryptionKMSConfig sse_kms_config,
|
||||
HTTPHeaderEntries headers,
|
||||
CredentialsConfiguration credentials_configuration);
|
||||
|
||||
|
@ -100,9 +100,8 @@ namespace
|
||||
std::mutex bg_tasks_mutex;
|
||||
std::condition_variable bg_tasks_condvar;
|
||||
|
||||
void createMultipartUpload()
|
||||
void fillCreateMultipartRequest(S3::CreateMultipartUploadRequest & request)
|
||||
{
|
||||
S3::CreateMultipartUploadRequest request;
|
||||
request.SetBucket(dest_bucket);
|
||||
request.SetKey(dest_key);
|
||||
|
||||
@ -116,6 +115,14 @@ namespace
|
||||
if (!storage_class_name.empty())
|
||||
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
|
||||
|
||||
client_ptr->setKMSHeaders(request);
|
||||
}
|
||||
|
||||
void createMultipartUpload()
|
||||
{
|
||||
S3::CreateMultipartUploadRequest request;
|
||||
fillCreateMultipartRequest(request);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
|
||||
@ -465,6 +472,8 @@ namespace
|
||||
|
||||
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
|
||||
request.SetContentType("binary/octet-stream");
|
||||
|
||||
client_ptr->setKMSHeaders(request);
|
||||
}
|
||||
|
||||
void processPutRequest(const S3::PutObjectRequest & request)
|
||||
@ -660,6 +669,8 @@ namespace
|
||||
|
||||
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
|
||||
request.SetContentType("binary/octet-stream");
|
||||
|
||||
client_ptr->setKMSHeaders(request);
|
||||
}
|
||||
|
||||
void processCopyRequest(const S3::CopyObjectRequest & request)
|
||||
|
@ -8,38 +8,66 @@
|
||||
#include <Poco/Net/HTTPRequestHandlerFactory.h>
|
||||
#include <Poco/Net/HTTPServer.h>
|
||||
#include <Poco/Net/HTTPServerParams.h>
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
#include <Poco/Net/MessageHeader.h>
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Poco/Net/ServerSocket.h>
|
||||
#include <Poco/AutoPtr.h>
|
||||
#include <Poco/SharedPtr.h>
|
||||
|
||||
class MockRequestHandler : public Poco::Net::HTTPRequestHandler
|
||||
{
|
||||
Poco::Net::MessageHeader & last_request_header;
|
||||
|
||||
public:
|
||||
MockRequestHandler(Poco::Net::MessageHeader & last_request_header_)
|
||||
: Poco::Net::HTTPRequestHandler(), last_request_header(last_request_header_)
|
||||
{
|
||||
}
|
||||
|
||||
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override
|
||||
{
|
||||
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
|
||||
last_request_header = request;
|
||||
response.send();
|
||||
}
|
||||
};
|
||||
|
||||
template <typename RequestHandler>
|
||||
class HTTPRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
|
||||
{
|
||||
Poco::Net::MessageHeader & last_request_header;
|
||||
|
||||
virtual Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest &) override
|
||||
{
|
||||
return new RequestHandler();
|
||||
return new MockRequestHandler(last_request_header);
|
||||
}
|
||||
|
||||
public:
|
||||
HTTPRequestHandlerFactory(Poco::Net::MessageHeader & last_request_header_)
|
||||
: Poco::Net::HTTPRequestHandlerFactory(), last_request_header(last_request_header_)
|
||||
{
|
||||
}
|
||||
|
||||
virtual ~HTTPRequestHandlerFactory() override
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
template <typename RequestHandler>
|
||||
class TestPocoHTTPServer
|
||||
{
|
||||
std::unique_ptr<Poco::Net::ServerSocket> server_socket;
|
||||
Poco::SharedPtr<HTTPRequestHandlerFactory<RequestHandler>> handler_factory;
|
||||
Poco::SharedPtr<HTTPRequestHandlerFactory> handler_factory;
|
||||
Poco::AutoPtr<Poco::Net::HTTPServerParams> server_params;
|
||||
std::unique_ptr<Poco::Net::HTTPServer> server;
|
||||
// Stores the last request header handled. It's obviously not thread-safe to share the same
|
||||
// reference across request handlers, but it's good enough for this the purposes of this test.
|
||||
Poco::Net::MessageHeader last_request_header;
|
||||
|
||||
public:
|
||||
TestPocoHTTPServer():
|
||||
server_socket(std::make_unique<Poco::Net::ServerSocket>(0)),
|
||||
handler_factory(new HTTPRequestHandlerFactory<RequestHandler>()),
|
||||
handler_factory(new HTTPRequestHandlerFactory(last_request_header)),
|
||||
server_params(new Poco::Net::HTTPServerParams()),
|
||||
server(std::make_unique<Poco::Net::HTTPServer>(handler_factory, *server_socket, server_params))
|
||||
{
|
||||
@ -50,4 +78,9 @@ public:
|
||||
{
|
||||
return "http://" + server_socket->address().toString();
|
||||
}
|
||||
|
||||
const Poco::Net::MessageHeader & getLastRequestHeader() const
|
||||
{
|
||||
return last_request_header;
|
||||
}
|
||||
};
|
||||
|
@ -7,13 +7,9 @@
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <memory>
|
||||
#include <ostream>
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include <Poco/Net/HTTPRequestHandler.h>
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
#include <Poco/URI.h>
|
||||
|
||||
#include <aws/core/client/AWSError.h>
|
||||
@ -25,6 +21,7 @@
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <IO/S3/Client.h>
|
||||
#include <IO/HTTPHeaderEntries.h>
|
||||
@ -41,83 +38,33 @@ public:
|
||||
~NoRetryStrategy() override = default;
|
||||
};
|
||||
|
||||
|
||||
TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
|
||||
String getSSEAndSignedHeaders(const Poco::Net::MessageHeader & message_header)
|
||||
{
|
||||
/// See https://github.com/ClickHouse/ClickHouse/pull/19748
|
||||
|
||||
class MyRequestHandler : public Poco::Net::HTTPRequestHandler
|
||||
String content;
|
||||
for (const auto & [header_name, header_value] : message_header)
|
||||
{
|
||||
public:
|
||||
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override
|
||||
if (boost::algorithm::starts_with(header_name, "x-amz-server-side-encryption"))
|
||||
{
|
||||
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
|
||||
std::ostream & out = response.send();
|
||||
for (const auto & [header_name, header_value] : request)
|
||||
{
|
||||
if (boost::algorithm::starts_with(header_name, "x-amz-server-side-encryption-customer-"))
|
||||
{
|
||||
out << header_name << ": " << header_value << "\n";
|
||||
}
|
||||
else if (header_name == "authorization")
|
||||
{
|
||||
std::vector<String> parts;
|
||||
boost::split(parts, header_value, [](char c){ return c == ' '; });
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
if (boost::algorithm::starts_with(part, "SignedHeaders="))
|
||||
out << header_name << ": ... " << part << " ...\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
out.flush();
|
||||
content += header_name + ": " + header_value + "\n";
|
||||
}
|
||||
};
|
||||
else if (header_name == "authorization")
|
||||
{
|
||||
std::vector<String> parts;
|
||||
boost::split(parts, header_value, [](char c){ return c == ' '; });
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
if (boost::algorithm::starts_with(part, "SignedHeaders="))
|
||||
content += header_name + ": ... " + part + " ...\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
return content;
|
||||
}
|
||||
|
||||
TestPocoHTTPServer<MyRequestHandler> http;
|
||||
|
||||
DB::RemoteHostFilter remote_host_filter;
|
||||
unsigned int s3_max_redirects = 100;
|
||||
DB::S3::URI uri(http.getUrl() + "/IOTestAwsS3ClientAppendExtraHeaders/test.txt");
|
||||
String access_key_id = "ACCESS_KEY_ID";
|
||||
String secret_access_key = "SECRET_ACCESS_KEY";
|
||||
String region = "us-east-1";
|
||||
void doReadRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::URI & uri)
|
||||
{
|
||||
String version_id;
|
||||
UInt64 max_single_read_retries = 1;
|
||||
bool enable_s3_requests_logging = false;
|
||||
DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
|
||||
region,
|
||||
remote_host_filter,
|
||||
s3_max_redirects,
|
||||
enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false,
|
||||
/* get_request_throttler = */ {},
|
||||
/* put_request_throttler = */ {}
|
||||
);
|
||||
|
||||
client_configuration.endpointOverride = uri.endpoint;
|
||||
client_configuration.retryStrategy = std::make_shared<NoRetryStrategy>();
|
||||
|
||||
String server_side_encryption_customer_key_base64 = "Kv/gDqdWVGIT4iDqg+btQvV3lc1idlm4WI+MMOyHOAw=";
|
||||
DB::HTTPHeaderEntries headers;
|
||||
bool use_environment_credentials = false;
|
||||
bool use_insecure_imds_request = false;
|
||||
|
||||
std::shared_ptr<DB::S3::Client> client = DB::S3::ClientFactory::instance().create(
|
||||
client_configuration,
|
||||
uri.is_virtual_hosted_style,
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
server_side_encryption_customer_key_base64,
|
||||
headers,
|
||||
DB::S3::CredentialsConfiguration
|
||||
{
|
||||
.use_environment_credentials = use_environment_credentials,
|
||||
.use_insecure_imds_request = use_insecure_imds_request
|
||||
}
|
||||
);
|
||||
|
||||
ASSERT_TRUE(client);
|
||||
|
||||
DB::ReadSettings read_settings;
|
||||
DB::S3Settings::RequestSettings request_settings;
|
||||
@ -133,7 +80,170 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
|
||||
|
||||
String content;
|
||||
DB::readStringUntilEOF(content, read_buffer);
|
||||
EXPECT_EQ(content, "authorization: ... SignedHeaders=amz-sdk-invocation-id;amz-sdk-request;content-type;host;x-amz-api-version;x-amz-content-sha256;x-amz-date, ...\nx-amz-server-side-encryption-customer-algorithm: AES256\nx-amz-server-side-encryption-customer-key: Kv/gDqdWVGIT4iDqg+btQvV3lc1idlm4WI+MMOyHOAw=\nx-amz-server-side-encryption-customer-key-md5: fMNuOw6OLU5GG2vc6RTA+g==\n");
|
||||
}
|
||||
|
||||
void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::URI & uri)
|
||||
{
|
||||
UInt64 max_unexpected_write_error_retries = 1;
|
||||
|
||||
DB::S3Settings::RequestSettings request_settings;
|
||||
request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries;
|
||||
DB::WriteBufferFromS3 write_buffer(
|
||||
client,
|
||||
uri.bucket,
|
||||
uri.key,
|
||||
request_settings
|
||||
);
|
||||
|
||||
write_buffer.write('\0'); // doesn't matter what we write here, just needs to be something
|
||||
write_buffer.finalize();
|
||||
}
|
||||
|
||||
using RequestFn = std::function<void(std::shared_ptr<const DB::S3::Client>, const DB::S3::URI &)>;
|
||||
|
||||
void testServerSideEncryption(
|
||||
RequestFn do_request,
|
||||
String server_side_encryption_customer_key_base64,
|
||||
DB::S3::ServerSideEncryptionKMSConfig sse_kms_config,
|
||||
String expected_headers)
|
||||
{
|
||||
TestPocoHTTPServer http;
|
||||
|
||||
DB::RemoteHostFilter remote_host_filter;
|
||||
unsigned int s3_max_redirects = 100;
|
||||
DB::S3::URI uri(http.getUrl() + "/IOTestAwsS3ClientAppendExtraHeaders/test.txt");
|
||||
String access_key_id = "ACCESS_KEY_ID";
|
||||
String secret_access_key = "SECRET_ACCESS_KEY";
|
||||
String region = "us-east-1";
|
||||
bool enable_s3_requests_logging = false;
|
||||
DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
|
||||
region,
|
||||
remote_host_filter,
|
||||
s3_max_redirects,
|
||||
enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false,
|
||||
/* get_request_throttler = */ {},
|
||||
/* put_request_throttler = */ {}
|
||||
);
|
||||
|
||||
client_configuration.endpointOverride = uri.endpoint;
|
||||
client_configuration.retryStrategy = std::make_shared<NoRetryStrategy>();
|
||||
|
||||
DB::HTTPHeaderEntries headers;
|
||||
bool use_environment_credentials = false;
|
||||
bool use_insecure_imds_request = false;
|
||||
|
||||
std::shared_ptr<DB::S3::Client> client = DB::S3::ClientFactory::instance().create(
|
||||
client_configuration,
|
||||
uri.is_virtual_hosted_style,
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
server_side_encryption_customer_key_base64,
|
||||
sse_kms_config,
|
||||
headers,
|
||||
DB::S3::CredentialsConfiguration
|
||||
{
|
||||
.use_environment_credentials = use_environment_credentials,
|
||||
.use_insecure_imds_request = use_insecure_imds_request
|
||||
}
|
||||
);
|
||||
|
||||
ASSERT_TRUE(client);
|
||||
|
||||
do_request(client, uri);
|
||||
String content = getSSEAndSignedHeaders(http.getLastRequestHeader());
|
||||
EXPECT_EQ(content, expected_headers);
|
||||
}
|
||||
|
||||
TEST(IOTestAwsS3Client, AppendExtraSSECHeadersRead)
|
||||
{
|
||||
/// See https://github.com/ClickHouse/ClickHouse/pull/19748
|
||||
testServerSideEncryption(
|
||||
doReadRequest,
|
||||
"Kv/gDqdWVGIT4iDqg+btQvV3lc1idlm4WI+MMOyHOAw=",
|
||||
{},
|
||||
"authorization: ... SignedHeaders="
|
||||
"amz-sdk-invocation-id;"
|
||||
"amz-sdk-request;"
|
||||
"content-type;"
|
||||
"host;"
|
||||
"x-amz-api-version;"
|
||||
"x-amz-content-sha256;"
|
||||
"x-amz-date, ...\n"
|
||||
"x-amz-server-side-encryption-customer-algorithm: AES256\n"
|
||||
"x-amz-server-side-encryption-customer-key: Kv/gDqdWVGIT4iDqg+btQvV3lc1idlm4WI+MMOyHOAw=\n"
|
||||
"x-amz-server-side-encryption-customer-key-md5: fMNuOw6OLU5GG2vc6RTA+g==\n");
|
||||
}
|
||||
|
||||
TEST(IOTestAwsS3Client, AppendExtraSSECHeadersWrite)
|
||||
{
|
||||
/// See https://github.com/ClickHouse/ClickHouse/pull/19748
|
||||
testServerSideEncryption(
|
||||
doWriteRequest,
|
||||
"Kv/gDqdWVGIT4iDqg+btQvV3lc1idlm4WI+MMOyHOAw=",
|
||||
{},
|
||||
"authorization: ... SignedHeaders="
|
||||
"amz-sdk-invocation-id;"
|
||||
"amz-sdk-request;"
|
||||
"content-length;"
|
||||
"content-md5;"
|
||||
"content-type;"
|
||||
"host;"
|
||||
"x-amz-content-sha256;"
|
||||
"x-amz-date, ...\n"
|
||||
"x-amz-server-side-encryption-customer-algorithm: AES256\n"
|
||||
"x-amz-server-side-encryption-customer-key: Kv/gDqdWVGIT4iDqg+btQvV3lc1idlm4WI+MMOyHOAw=\n"
|
||||
"x-amz-server-side-encryption-customer-key-md5: fMNuOw6OLU5GG2vc6RTA+g==\n");
|
||||
}
|
||||
|
||||
TEST(IOTestAwsS3Client, AppendExtraSSEKMSHeadersRead)
|
||||
{
|
||||
DB::S3::ServerSideEncryptionKMSConfig sse_kms_config;
|
||||
sse_kms_config.key_id = "alias/test-key";
|
||||
sse_kms_config.encryption_context = "arn:aws:s3:::bucket_ARN";
|
||||
sse_kms_config.bucket_key_enabled = true;
|
||||
// KMS headers shouldn't be set on a read request
|
||||
testServerSideEncryption(
|
||||
doReadRequest,
|
||||
"",
|
||||
sse_kms_config,
|
||||
"authorization: ... SignedHeaders="
|
||||
"amz-sdk-invocation-id;"
|
||||
"amz-sdk-request;"
|
||||
"content-type;"
|
||||
"host;"
|
||||
"x-amz-api-version;"
|
||||
"x-amz-content-sha256;"
|
||||
"x-amz-date, ...\n");
|
||||
}
|
||||
|
||||
TEST(IOTestAwsS3Client, AppendExtraSSEKMSHeadersWrite)
|
||||
{
|
||||
DB::S3::ServerSideEncryptionKMSConfig sse_kms_config;
|
||||
sse_kms_config.key_id = "alias/test-key";
|
||||
sse_kms_config.encryption_context = "arn:aws:s3:::bucket_ARN";
|
||||
sse_kms_config.bucket_key_enabled = true;
|
||||
testServerSideEncryption(
|
||||
doWriteRequest,
|
||||
"",
|
||||
sse_kms_config,
|
||||
"authorization: ... SignedHeaders="
|
||||
"amz-sdk-invocation-id;"
|
||||
"amz-sdk-request;"
|
||||
"content-length;"
|
||||
"content-md5;"
|
||||
"content-type;"
|
||||
"host;"
|
||||
"x-amz-content-sha256;"
|
||||
"x-amz-date;"
|
||||
"x-amz-server-side-encryption;"
|
||||
"x-amz-server-side-encryption-aws-kms-key-id;"
|
||||
"x-amz-server-side-encryption-bucket-key-enabled;"
|
||||
"x-amz-server-side-encryption-context, ...\n"
|
||||
"x-amz-server-side-encryption: aws:kms\n"
|
||||
"x-amz-server-side-encryption-aws-kms-key-id: alias/test-key\n"
|
||||
"x-amz-server-side-encryption-bucket-key-enabled: true\n"
|
||||
"x-amz-server-side-encryption-context: arn:aws:s3:::bucket_ARN\n");
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -70,6 +70,41 @@ namespace ErrorCodes
|
||||
namespace S3
|
||||
{
|
||||
|
||||
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
HTTPHeaderEntries headers;
|
||||
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
|
||||
config.keys(config_elem, subconfig_keys);
|
||||
for (const std::string & subkey : subconfig_keys)
|
||||
{
|
||||
if (subkey.starts_with("header"))
|
||||
{
|
||||
auto header_str = config.getString(config_elem + "." + subkey);
|
||||
auto delimiter = header_str.find(':');
|
||||
if (delimiter == std::string::npos)
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Malformed s3 header value");
|
||||
headers.emplace_back(header_str.substr(0, delimiter), header_str.substr(delimiter + 1, String::npos));
|
||||
}
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
||||
ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
ServerSideEncryptionKMSConfig sse_kms_config;
|
||||
|
||||
if (config.has(config_elem + ".server_side_encryption_kms_key_id"))
|
||||
sse_kms_config.key_id = config.getString(config_elem + ".server_side_encryption_kms_key_id");
|
||||
|
||||
if (config.has(config_elem + ".server_side_encryption_kms_encryption_context"))
|
||||
sse_kms_config.encryption_context = config.getString(config_elem + ".server_side_encryption_kms_encryption_context");
|
||||
|
||||
if (config.has(config_elem + ".server_side_encryption_kms_bucket_key_enabled"))
|
||||
sse_kms_config.bucket_key_enabled = config.getBool(config_elem + ".server_side_encryption_kms_bucket_key_enabled");
|
||||
|
||||
return sse_kms_config;
|
||||
}
|
||||
|
||||
AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
auto access_key_id = config.getString(config_elem + ".access_key_id", "");
|
||||
@ -93,26 +128,15 @@ AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const
|
||||
if (config.has(config_elem + ".no_sign_request"))
|
||||
no_sign_request = config.getBool(config_elem + ".no_sign_request");
|
||||
|
||||
HTTPHeaderEntries headers;
|
||||
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
|
||||
config.keys(config_elem, subconfig_keys);
|
||||
for (const std::string & subkey : subconfig_keys)
|
||||
{
|
||||
if (subkey.starts_with("header"))
|
||||
{
|
||||
auto header_str = config.getString(config_elem + "." + subkey);
|
||||
auto delimiter = header_str.find(':');
|
||||
if (delimiter == std::string::npos)
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Malformed s3 header value");
|
||||
headers.emplace_back(header_str.substr(0, delimiter), header_str.substr(delimiter + 1, String::npos));
|
||||
}
|
||||
}
|
||||
HTTPHeaderEntries headers = getHTTPHeaders(config_elem, config);
|
||||
ServerSideEncryptionKMSConfig sse_kms_config = getSSEKMSConfig(config_elem, config);
|
||||
|
||||
return AuthSettings
|
||||
{
|
||||
std::move(access_key_id), std::move(secret_access_key),
|
||||
std::move(region),
|
||||
std::move(server_side_encryption_customer_key_base64),
|
||||
std::move(sse_kms_config),
|
||||
std::move(headers),
|
||||
use_environment_credentials,
|
||||
use_insecure_imds_request,
|
||||
@ -135,6 +159,7 @@ void AuthSettings::updateFrom(const AuthSettings & from)
|
||||
headers = from.headers;
|
||||
region = from.region;
|
||||
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
|
||||
server_side_encryption_kms_config = from.server_side_encryption_kms_config;
|
||||
|
||||
if (from.use_environment_credentials.has_value())
|
||||
use_environment_credentials = from.use_environment_credentials;
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/S3/Client.h>
|
||||
#include <IO/S3/PocoHTTPClient.h>
|
||||
#include <IO/HTTPHeaderEntries.h>
|
||||
|
||||
@ -14,7 +15,6 @@
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Throttler_fwd.h>
|
||||
|
||||
#include <IO/S3/Client.h>
|
||||
#include <IO/S3/URI.h>
|
||||
|
||||
#include <aws/core/Aws.h>
|
||||
@ -71,6 +71,10 @@ namespace Poco::Util
|
||||
namespace DB::S3
|
||||
{
|
||||
|
||||
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
struct AuthSettings
|
||||
{
|
||||
static AuthSettings loadFromConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
|
||||
@ -79,6 +83,7 @@ struct AuthSettings
|
||||
std::string secret_access_key;
|
||||
std::string region;
|
||||
std::string server_side_encryption_customer_key_base64;
|
||||
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;
|
||||
|
||||
HTTPHeaderEntries headers;
|
||||
|
||||
|
@ -238,9 +238,8 @@ void WriteBufferFromS3::finalizeImpl()
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::createMultipartUpload()
|
||||
void WriteBufferFromS3::fillCreateMultipartRequest(DB::S3::CreateMultipartUploadRequest & req)
|
||||
{
|
||||
DB::S3::CreateMultipartUploadRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
||||
@ -250,6 +249,14 @@ void WriteBufferFromS3::createMultipartUpload()
|
||||
if (object_metadata.has_value())
|
||||
req.SetMetadata(object_metadata.value());
|
||||
|
||||
client_ptr->setKMSHeaders(req);
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::createMultipartUpload()
|
||||
{
|
||||
DB::S3::CreateMultipartUploadRequest req;
|
||||
fillCreateMultipartRequest(req);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload);
|
||||
if (write_settings.for_object_storage)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
|
||||
@ -571,6 +578,8 @@ void WriteBufferFromS3::fillPutRequest(S3::PutObjectRequest & req)
|
||||
|
||||
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
|
||||
req.SetContentType("binary/octet-stream");
|
||||
|
||||
client_ptr->setKMSHeaders(req);
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
|
||||
|
@ -61,6 +61,7 @@ private:
|
||||
void processWithStrictParts();
|
||||
void processWithDynamicParts();
|
||||
|
||||
void fillCreateMultipartRequest(S3::CreateMultipartUploadRequest & req);
|
||||
void createMultipartUpload();
|
||||
void writePart();
|
||||
void completeMultipartUpload();
|
||||
|
@ -1252,6 +1252,7 @@ void StorageS3::Configuration::connect(ContextPtr context)
|
||||
credentials.GetAWSAccessKeyId(),
|
||||
credentials.GetAWSSecretKey(),
|
||||
auth_settings.server_side_encryption_customer_key_base64,
|
||||
auth_settings.server_side_encryption_kms_config,
|
||||
std::move(headers),
|
||||
S3::CredentialsConfiguration{
|
||||
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
|
||||
|
Loading…
Reference in New Issue
Block a user