ClickHouse/src/IO/S3Common.cpp

676 lines
28 KiB
C++
Raw Normal View History

2019-12-06 14:37:21 +00:00
#include <Common/config.h>
#if USE_AWS_S3
# include <IO/S3Common.h>
# include <IO/WriteBufferFromString.h>
# include <Storages/StorageS3Settings.h>
2019-11-05 07:54:13 +00:00
2021-03-30 00:32:26 +00:00
# include <aws/core/Version.h>
# include <aws/core/auth/AWSCredentialsProvider.h>
# include <aws/core/auth/AWSCredentialsProviderChain.h>
# include <aws/core/auth/STSCredentialsProvider.h>
# include <aws/core/client/DefaultRetryStrategy.h>
# include <aws/core/platform/Environment.h>
2021-03-30 00:32:26 +00:00
# include <aws/core/platform/OSVersionInfo.h>
# include <aws/core/utils/json/JsonSerializer.h>
# include <aws/core/utils/logging/LogMacros.h>
# include <aws/core/utils/logging/LogSystemInterface.h>
2021-01-28 06:32:41 +00:00
# include <aws/core/utils/HashingUtils.h>
2020-05-27 15:02:25 +00:00
# include <aws/core/http/HttpClientFactory.h>
# include <aws/s3/S3Client.h>
2020-05-31 22:25:37 +00:00
# include <IO/S3/PocoHTTPClientFactory.h>
# include <IO/S3/PocoHTTPClient.h>
2020-05-19 10:00:40 +00:00
# include <Poco/URI.h>
# include <re2/re2.h>
# include <boost/algorithm/string/case_conv.hpp>
# include <common/logger_useful.h>
2019-11-05 07:54:13 +00:00
namespace
{
const char * S3_LOGGER_TAG_NAMES[][2] = {
{"AWSClient", "AWSClient"},
{"AWSAuthV4Signer", "AWSClient (AWSAuthV4Signer)"},
};
2020-05-30 21:57:37 +00:00
const std::pair<DB::LogsLevel, Poco::Message::Priority> & convertLogLevel(Aws::Utils::Logging::LogLevel log_level)
{
2020-05-30 21:57:37 +00:00
static const std::unordered_map<Aws::Utils::Logging::LogLevel, std::pair<DB::LogsLevel, Poco::Message::Priority>> mapping =
{
{Aws::Utils::Logging::LogLevel::Off, {DB::LogsLevel::none, Poco::Message::PRIO_FATAL}},
{Aws::Utils::Logging::LogLevel::Fatal, {DB::LogsLevel::error, Poco::Message::PRIO_FATAL}},
{Aws::Utils::Logging::LogLevel::Error, {DB::LogsLevel::error, Poco::Message::PRIO_ERROR}},
{Aws::Utils::Logging::LogLevel::Warn, {DB::LogsLevel::warning, Poco::Message::PRIO_WARNING}},
{Aws::Utils::Logging::LogLevel::Info, {DB::LogsLevel::information, Poco::Message::PRIO_INFORMATION}},
{Aws::Utils::Logging::LogLevel::Debug, {DB::LogsLevel::trace, Poco::Message::PRIO_TRACE}},
2020-05-30 21:57:37 +00:00
{Aws::Utils::Logging::LogLevel::Trace, {DB::LogsLevel::trace, Poco::Message::PRIO_TRACE}},
};
return mapping.at(log_level);
}
class AWSLogger final : public Aws::Utils::Logging::LogSystemInterface
{
public:
AWSLogger()
{
for (auto [tag, name] : S3_LOGGER_TAG_NAMES)
tag_loggers[tag] = &Poco::Logger::get(name);
default_logger = tag_loggers[S3_LOGGER_TAG_NAMES[0][0]];
}
~AWSLogger() final = default;
Aws::Utils::Logging::LogLevel GetLogLevel() const final { return Aws::Utils::Logging::LogLevel::Trace; }
2020-03-18 03:27:32 +00:00
void Log(Aws::Utils::Logging::LogLevel log_level, const char * tag, const char * format_str, ...) final // NOLINT
{
callLogImpl(log_level, tag, format_str); /// FIXME. Variadic arguments?
}
void LogStream(Aws::Utils::Logging::LogLevel log_level, const char * tag, const Aws::OStringStream & message_stream) final
{
callLogImpl(log_level, tag, message_stream.str().c_str());
}
void callLogImpl(Aws::Utils::Logging::LogLevel log_level, const char * tag, const char * message)
{
2020-04-22 05:39:31 +00:00
const auto & [level, prio] = convertLogLevel(log_level);
if (tag_loggers.count(tag) > 0)
{
LOG_IMPL(tag_loggers[tag], level, prio, "{}", message);
}
else
{
LOG_IMPL(default_logger, level, prio, "{}: {}", tag, message);
}
}
void Flush() final {}
private:
Poco::Logger * default_logger;
std::unordered_map<String, Poco::Logger *> tag_loggers;
};
2021-03-30 00:32:26 +00:00
class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient
{
2021-03-30 00:32:26 +00:00
static constexpr char EC2_SECURITY_CREDENTIALS_RESOURCE[] = "/latest/meta-data/iam/security-credentials";
static constexpr char EC2_IMDS_TOKEN_RESOURCE[] = "/latest/api/token";
static constexpr char EC2_IMDS_TOKEN_HEADER[] = "x-aws-ec2-metadata-token";
static constexpr char EC2_IMDS_TOKEN_TTL_DEFAULT_VALUE[] = "21600";
static constexpr char EC2_IMDS_TOKEN_TTL_HEADER[] = "x-aws-ec2-metadata-token-ttl-seconds";
2021-03-29 07:13:53 +00:00
public:
2021-03-30 00:32:26 +00:00
/// See EC2MetadataClient.
AWSEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration)
: Aws::Internal::AWSHttpResourceClient(client_configuration)
, logger(&Poco::Logger::get("AWSEC2InstanceProfileConfigLoader"))
{
}
2021-03-30 00:32:26 +00:00
AWSEC2MetadataClient& operator =(const AWSEC2MetadataClient & rhs) = delete;
AWSEC2MetadataClient(const AWSEC2MetadataClient & rhs) = delete;
AWSEC2MetadataClient& operator =(const AWSEC2MetadataClient && rhs) = delete;
AWSEC2MetadataClient(const AWSEC2MetadataClient && rhs) = delete;
virtual ~AWSEC2MetadataClient() override
{
}
using Aws::Internal::AWSHttpResourceClient::GetResource;
virtual Aws::String GetResource(const char * resource_path) const
{
return GetResource(endpoint.c_str(), resource_path, nullptr/*authToken*/);
}
virtual Aws::String GetDefaultCredentials() const
{
2021-03-30 00:32:26 +00:00
std::unique_lock<std::recursive_mutex> locker(token_mutex);
LOG_TRACE(logger, "Getting default credentials for ec2 instance.");
auto result = GetResourceWithAWSWebServiceResult(endpoint.c_str(), EC2_SECURITY_CREDENTIALS_RESOURCE, nullptr);
Aws::String credentialsString = result.GetPayload();
auto httpResponseCode = result.GetResponseCode();
if (httpResponseCode == Aws::Http::HttpResponseCode::UNAUTHORIZED)
{
2021-03-30 00:32:26 +00:00
return {};
}
locker.unlock();
Aws::String trimmedCredentialsString = Aws::Utils::StringUtils::Trim(credentialsString.c_str());
if (trimmedCredentialsString.empty()) return {};
Aws::Vector<Aws::String> securityCredentials = Aws::Utils::StringUtils::Split(trimmedCredentialsString, '\n');
LOG_DEBUG(logger, "Calling EC2MetadataService resource, {} returned credential string {}.",
EC2_SECURITY_CREDENTIALS_RESOURCE, trimmedCredentialsString);
if (securityCredentials.size() == 0)
{
LOG_WARNING(logger, "Initial call to ec2Metadataservice to get credentials failed.");
return {};
}
Aws::StringStream ss;
ss << EC2_SECURITY_CREDENTIALS_RESOURCE << "/" << securityCredentials[0];
LOG_DEBUG(logger, "Calling EC2MetadataService resource {}.", ss.str());
return GetResource(ss.str().c_str());
}
static Aws::String AWSComputeUserAgentString()
{
Aws::StringStream ss;
ss << "aws-sdk-cpp/" << Aws::Version::GetVersionString() << " " << Aws::OSVersionInfo::ComputeOSVersionString()
<< " " << Aws::Version::GetCompilerVersionString();
return ss.str();
}
virtual Aws::String GetDefaultCredentialsSecurely() const
{
std::unique_lock<std::recursive_mutex> locker(token_mutex);
Aws::StringStream ss;
ss << endpoint << EC2_IMDS_TOKEN_RESOURCE;
std::shared_ptr<Aws::Http::HttpRequest> tokenRequest(Aws::Http::CreateHttpRequest(ss.str(), Aws::Http::HttpMethod::HTTP_PUT,
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
tokenRequest->SetHeaderValue(EC2_IMDS_TOKEN_TTL_HEADER, EC2_IMDS_TOKEN_TTL_DEFAULT_VALUE);
auto userAgentString = AWSComputeUserAgentString();
tokenRequest->SetUserAgent(userAgentString);
LOG_TRACE(logger, "Calling EC2MetadataService to get token.");
auto result = GetResourceWithAWSWebServiceResult(tokenRequest);
Aws::String tokenString = result.GetPayload();
Aws::String trimmedTokenString = Aws::Utils::StringUtils::Trim(tokenString.c_str());
if (result.GetResponseCode() == Aws::Http::HttpResponseCode::BAD_REQUEST)
{
return {};
}
else if (result.GetResponseCode() != Aws::Http::HttpResponseCode::OK || trimmedTokenString.empty())
{
LOG_TRACE(logger, "Calling EC2MetadataService to get token failed, falling back to less secure way.");
return GetDefaultCredentials();
}
token = trimmedTokenString;
locker.unlock();
ss.str("");
ss << endpoint << EC2_SECURITY_CREDENTIALS_RESOURCE;
std::shared_ptr<Aws::Http::HttpRequest> profileRequest(Aws::Http::CreateHttpRequest(ss.str(), Aws::Http::HttpMethod::HTTP_GET,
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
profileRequest->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, trimmedTokenString);
profileRequest->SetUserAgent(userAgentString);
Aws::String profileString = GetResourceWithAWSWebServiceResult(profileRequest).GetPayload();
Aws::String trimmedProfileString = Aws::Utils::StringUtils::Trim(profileString.c_str());
Aws::Vector<Aws::String> securityCredentials = Aws::Utils::StringUtils::Split(trimmedProfileString, '\n');
LOG_DEBUG(logger, "Calling EC2MetadataService resource, {} with token returned profile string {}.",
EC2_SECURITY_CREDENTIALS_RESOURCE, trimmedProfileString);
if (securityCredentials.size() == 0)
{
LOG_WARNING(logger, "Calling EC2Metadataservice to get profiles failed.");
return {};
}
2021-03-30 00:32:26 +00:00
ss.str("");
ss << endpoint << EC2_SECURITY_CREDENTIALS_RESOURCE << "/" << securityCredentials[0];
std::shared_ptr<Aws::Http::HttpRequest> credentialsRequest(Aws::Http::CreateHttpRequest(ss.str(), Aws::Http::HttpMethod::HTTP_GET,
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
credentialsRequest->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, trimmedTokenString);
credentialsRequest->SetUserAgent(userAgentString);
LOG_DEBUG(logger, "Calling EC2MetadataService resource {} with token.", ss.str());
return GetResourceWithAWSWebServiceResult(credentialsRequest).GetPayload();
}
virtual Aws::String GetCurrentRegion() const
{
return Aws::Region::AWS_GLOBAL;
}
private:
2021-03-30 00:32:26 +00:00
Aws::String endpoint = "http://169.254.169.254";
mutable std::recursive_mutex token_mutex;
mutable Aws::String token;
mutable Aws::String region;
Poco::Logger * logger;
};
2021-03-30 00:32:26 +00:00
class AWSEC2InstanceProfileConfigLoader : public Aws::Config::AWSProfileConfigLoader
{
public:
AWSEC2InstanceProfileConfigLoader(const std::shared_ptr<AWSEC2MetadataClient> & client_, bool use_secure_pull_)
: client(client_)
, use_secure_pull(use_secure_pull_)
, logger(&Poco::Logger::get("AWSEC2InstanceProfileConfigLoader"))
{
}
2021-03-30 00:32:26 +00:00
virtual ~AWSEC2InstanceProfileConfigLoader() override
{
}
protected:
virtual bool LoadInternal() override
{
auto credentials_str = use_secure_pull ? client->GetDefaultCredentialsSecurely() : client->GetDefaultCredentials();
/// See EC2InstanceProfileConfigLoader.
if (credentials_str.empty()) return false;
Aws::Utils::Json::JsonValue credentialsDoc(credentials_str);
if (!credentialsDoc.WasParseSuccessful())
{
LOG_ERROR(logger, "Failed to parse output from EC2MetadataService.");
return false;
}
Aws::String accessKey, secretKey, token;
auto credentialsView = credentialsDoc.View();
accessKey = credentialsView.GetString("AccessKeyId");
LOG_ERROR(logger, "Successfully pulled credentials from metadata service with access key {}.", accessKey);
secretKey = credentialsView.GetString("SecretAccessKey");
token = credentialsView.GetString("Token");
auto region = client->GetCurrentRegion();
Aws::Config::Profile profile;
profile.SetCredentials(Aws::Auth::AWSCredentials(accessKey, secretKey, token));
profile.SetRegion(region);
profile.SetName(Aws::Config::INSTANCE_PROFILE_KEY);
m_profiles[Aws::Config::INSTANCE_PROFILE_KEY] = profile;
return true;
}
private:
std::shared_ptr<AWSEC2MetadataClient> client;
bool use_secure_pull;
Poco::Logger * logger;
};
class AWSInstanceProfileCredentialsProvider : public Aws::Auth::AWSCredentialsProvider
{
public:
2021-03-30 00:32:26 +00:00
/// See InstanceProfileCredentialsProvider.
AWSInstanceProfileCredentialsProvider(const std::shared_ptr<AWSEC2InstanceProfileConfigLoader> & config_loader)
: ec2_metadata_config_loader(config_loader)
, load_frequency_ms(Aws::Auth::REFRESH_THRESHOLD)
, logger(&Poco::Logger::get("AWSInstanceProfileCredentialsProvider"))
{
2021-03-30 00:32:26 +00:00
LOG_INFO(logger, "Creating Instance with injected EC2MetadataClient and refresh rate {}.");
}
Aws::Auth::AWSCredentials GetAWSCredentials() override
{
RefreshIfExpired();
Aws::Utils::Threading::ReaderLockGuard guard(m_reloadLock);
auto profileIter = ec2_metadata_config_loader->GetProfiles().find(Aws::Config::INSTANCE_PROFILE_KEY);
2021-03-30 00:32:26 +00:00
if (profileIter != ec2_metadata_config_loader->GetProfiles().end())
{
2021-03-30 00:32:26 +00:00
return profileIter->second.GetCredentials();
}
2021-03-30 00:32:26 +00:00
return Aws::Auth::AWSCredentials();
}
2021-03-30 00:32:26 +00:00
protected:
void Reload() override
{
LOG_INFO(logger, "Credentials have expired attempting to repull from EC2 Metadata Service.");
ec2_metadata_config_loader->Load();
AWSCredentialsProvider::Reload();
}
2021-03-30 00:32:26 +00:00
private:
void RefreshIfExpired()
{
LOG_DEBUG(logger, "Checking if latest credential pull has expired.");
Aws::Utils::Threading::ReaderLockGuard guard(m_reloadLock);
if (!IsTimeToRefresh(load_frequency_ms))
{
return;
}
guard.UpgradeToWriterLock();
if (!IsTimeToRefresh(load_frequency_ms)) // double-checked lock to avoid refreshing twice
{
return;
}
2021-03-30 00:32:26 +00:00
Reload();
}
std::shared_ptr<AWSEC2InstanceProfileConfigLoader> ec2_metadata_config_loader;
long load_frequency_ms;
Poco::Logger * logger;
};
class S3CredentialsProviderChain : public Aws::Auth::AWSCredentialsProviderChain
{
public:
explicit S3CredentialsProviderChain(const DB::S3::PocoHTTPClientConfiguration & configuration, const Aws::Auth::AWSCredentials & credentials, bool use_environment_credentials, bool use_insecure_imds_request)
{
auto * logger = &Poco::Logger::get("S3CredentialsProviderChain");
if (use_environment_credentials)
{
static const char AWS_ECS_CONTAINER_CREDENTIALS_RELATIVE_URI[] = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";
static const char AWS_ECS_CONTAINER_CREDENTIALS_FULL_URI[] = "AWS_CONTAINER_CREDENTIALS_FULL_URI";
static const char AWS_ECS_CONTAINER_AUTHORIZATION_TOKEN[] = "AWS_CONTAINER_AUTHORIZATION_TOKEN";
static const char AWS_EC2_METADATA_DISABLED[] = "AWS_EC2_METADATA_DISABLED";
/// The only difference from DefaultAWSCredentialsProviderChain::DefaultAWSCredentialsProviderChain()
/// is that this chain uses custom ClientConfiguration.
AddProvider(std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>());
AddProvider(std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>());
AddProvider(std::make_shared<Aws::Auth::ProcessCredentialsProvider>());
AddProvider(std::make_shared<Aws::Auth::STSAssumeRoleWebIdentityCredentialsProvider>());
/// ECS TaskRole Credentials only available when ENVIRONMENT VARIABLE is set.
const auto relative_uri = Aws::Environment::GetEnv(AWS_ECS_CONTAINER_CREDENTIALS_RELATIVE_URI);
LOG_DEBUG(logger, "The environment variable value {} is {}", AWS_ECS_CONTAINER_CREDENTIALS_RELATIVE_URI,
relative_uri);
const auto absolute_uri = Aws::Environment::GetEnv(AWS_ECS_CONTAINER_CREDENTIALS_FULL_URI);
LOG_DEBUG(logger, "The environment variable value {} is {}", AWS_ECS_CONTAINER_CREDENTIALS_FULL_URI,
absolute_uri);
const auto ec2_metadata_disabled = Aws::Environment::GetEnv(AWS_EC2_METADATA_DISABLED);
LOG_DEBUG(logger, "The environment variable value {} is {}", AWS_EC2_METADATA_DISABLED,
ec2_metadata_disabled);
if (!relative_uri.empty())
{
AddProvider(std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>(relative_uri.c_str()));
LOG_INFO(logger, "Added ECS metadata service credentials provider with relative path: [{}] to the provider chain.",
relative_uri);
}
else if (!absolute_uri.empty())
{
const auto token = Aws::Environment::GetEnv(AWS_ECS_CONTAINER_AUTHORIZATION_TOKEN);
AddProvider(std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>(absolute_uri.c_str(), token.c_str()));
/// DO NOT log the value of the authorization token for security purposes.
LOG_INFO(logger, "Added ECS credentials provider with URI: [{}] to the provider chain with a{} authorization token.",
absolute_uri, token.empty() ? "n empty" : " non-empty");
}
else if (Aws::Utils::StringUtils::ToLower(ec2_metadata_disabled.c_str()) != "true")
{
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(configuration.remote_host_filter, configuration.s3_max_redirects);
/// See MakeDefaultHttpResourceClientConfiguration().
/// This is part of EC2 metadata client, but unfortunately it can't be accessed from outside
/// of contrib/aws/aws-cpp-sdk-core/source/internal/AWSHttpResourceClient.cpp
aws_client_configuration.maxConnections = 2;
aws_client_configuration.scheme = Aws::Http::Scheme::HTTP;
/// Explicitly set the proxy settings to empty/zero to avoid relying on defaults that could potentially change
/// in the future.
aws_client_configuration.proxyHost = "";
aws_client_configuration.proxyUserName = "";
aws_client_configuration.proxyPassword = "";
aws_client_configuration.proxyPort = 0;
/// EC2MetadataService throttles by delaying the response so the service client should set a large read timeout.
/// EC2MetadataService delay is in order of seconds so it only make sense to retry after a couple of seconds.
aws_client_configuration.connectTimeoutMs = 1000;
2021-03-30 00:32:26 +00:00
/// FIXME. Somehow this timeout does not work in docker without --net=host.
aws_client_configuration.requestTimeoutMs = 1000;
2021-03-30 00:32:26 +00:00
aws_client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(1, 1000);
2021-03-30 00:32:26 +00:00
auto ec2_metadata_client = std::make_shared<AWSEC2MetadataClient>(aws_client_configuration);
auto config_loader = std::make_shared<AWSEC2InstanceProfileConfigLoader>(ec2_metadata_client, !use_insecure_imds_request);
2021-03-30 00:32:26 +00:00
AddProvider(std::make_shared<AWSInstanceProfileCredentialsProvider>(config_loader));
LOG_INFO(logger, "Added EC2 metadata service credentials provider to the provider chain.");
}
}
AddProvider(std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials));
}
};
class S3AuthSigner : public Aws::Client::AWSAuthV4Signer
{
public:
S3AuthSigner(
2020-06-04 13:48:20 +00:00
const Aws::Client::ClientConfiguration & client_configuration,
const Aws::Auth::AWSCredentials & credentials,
const DB::HeaderCollection & headers_,
bool use_environment_credentials,
2021-03-30 00:32:26 +00:00
bool use_insecure_imds_request)
: Aws::Client::AWSAuthV4Signer(
std::make_shared<S3CredentialsProviderChain>(
static_cast<const DB::S3::PocoHTTPClientConfiguration &>(client_configuration),
credentials,
use_environment_credentials,
2021-03-30 00:32:26 +00:00
use_insecure_imds_request),
"s3",
2020-06-04 13:48:20 +00:00
client_configuration.region,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
false)
, headers(headers_)
{
}
2020-06-04 13:48:20 +00:00
bool SignRequest(Aws::Http::HttpRequest & request, const char * region, bool sign_body) const override
{
2020-06-04 13:48:20 +00:00
auto result = Aws::Client::AWSAuthV4Signer::SignRequest(request, region, sign_body);
for (const auto & header : headers)
request.SetHeaderValue(header.name, header.value);
return result;
}
2021-01-26 07:49:16 +00:00
bool SignRequest(Aws::Http::HttpRequest & request, const char * region, const char * service_name, bool sign_body) const override
{
auto result = Aws::Client::AWSAuthV4Signer::SignRequest(request, region, service_name, sign_body);
for (const auto & header : headers)
request.SetHeaderValue(header.name, header.value);
return result;
}
bool PresignRequest(
Aws::Http::HttpRequest & request,
const char * region,
long long expiration_time_sec) const override // NOLINT
{
auto result = Aws::Client::AWSAuthV4Signer::PresignRequest(request, region, expiration_time_sec);
for (const auto & header : headers)
request.SetHeaderValue(header.name, header.value);
return result;
}
bool PresignRequest(
Aws::Http::HttpRequest & request,
const char * region,
2021-01-26 07:49:16 +00:00
const char * service_name,
2020-06-04 13:48:20 +00:00
long long expiration_time_sec) const override // NOLINT
{
2021-01-26 07:49:16 +00:00
auto result = Aws::Client::AWSAuthV4Signer::PresignRequest(request, region, service_name, expiration_time_sec);
for (const auto & header : headers)
request.SetHeaderValue(header.name, header.value);
return result;
}
private:
const DB::HeaderCollection headers;
};
}
2019-12-06 15:14:39 +00:00
namespace DB
{
namespace ErrorCodes
{
2019-12-03 16:23:24 +00:00
extern const int BAD_ARGUMENTS;
2019-12-03 01:22:25 +00:00
}
2019-12-06 15:14:39 +00:00
namespace S3
{
2019-12-06 14:48:56 +00:00
ClientFactory::ClientFactory()
{
2020-04-29 08:45:13 +00:00
aws_options = Aws::SDKOptions{};
2019-12-03 16:23:24 +00:00
Aws::InitAPI(aws_options);
Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>());
2020-05-31 22:25:37 +00:00
Aws::Http::SetHttpClientFactory(std::make_shared<PocoHTTPClientFactory>());
2019-12-03 16:23:24 +00:00
}
2019-12-06 14:48:56 +00:00
ClientFactory::~ClientFactory()
{
Aws::Utils::Logging::ShutdownAWSLogging();
2019-12-06 14:37:21 +00:00
Aws::ShutdownAPI(aws_options);
}
2019-11-05 07:54:13 +00:00
2019-12-10 23:03:45 +00:00
ClientFactory & ClientFactory::instance()
2019-12-06 14:48:56 +00:00
{
2019-12-06 14:37:21 +00:00
static ClientFactory ret;
return ret;
}
2019-12-03 16:23:24 +00:00
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT
const PocoHTTPClientConfiguration & cfg_,
bool is_virtual_hosted_style,
const String & access_key_id,
const String & secret_access_key,
2021-01-28 06:32:41 +00:00
const String & server_side_encryption_customer_key_base64,
HeaderCollection headers,
bool use_environment_credentials,
2021-03-30 00:32:26 +00:00
bool use_insecure_imds_request)
{
PocoHTTPClientConfiguration client_configuration = cfg_;
2020-10-06 08:20:47 +00:00
client_configuration.updateSchemeAndRegion();
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
2021-01-28 06:32:41 +00:00
if (!server_side_encryption_customer_key_base64.empty())
{
/// See S3Client::GeneratePresignedUrlWithSSEC().
headers.push_back({Aws::S3::SSEHeaders::SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
Aws::S3::Model::ServerSideEncryptionMapper::GetNameForServerSideEncryption(Aws::S3::Model::ServerSideEncryption::AES256)});
headers.push_back({Aws::S3::SSEHeaders::SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
server_side_encryption_customer_key_base64});
Aws::Utils::ByteBuffer buffer = Aws::Utils::HashingUtils::Base64Decode(server_side_encryption_customer_key_base64);
String str_buffer(reinterpret_cast<char *>(buffer.GetUnderlyingData()), buffer.GetLength());
headers.push_back({Aws::S3::SSEHeaders::SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
Aws::Utils::HashingUtils::Base64Encode(Aws::Utils::HashingUtils::CalculateMD5(str_buffer))});
}
auto auth_signer = std::make_shared<S3AuthSigner>(
client_configuration,
std::move(credentials),
std::move(headers),
use_environment_credentials,
2021-03-30 00:32:26 +00:00
use_insecure_imds_request);
2021-01-28 06:32:41 +00:00
return std::make_shared<Aws::S3::S3Client>(
std::move(auth_signer),
2020-10-06 08:20:47 +00:00
std::move(client_configuration), // Client configuration.
is_virtual_hosted_style || client_configuration.endpointOverride.empty() // Use virtual addressing only if endpoint is not specified.
);
}
2021-01-27 20:05:41 +00:00
PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects)
{
return PocoHTTPClientConfiguration(remote_host_filter, s3_max_redirects);
}
2020-04-29 08:45:13 +00:00
URI::URI(const Poco::URI & uri_)
2019-12-06 14:48:56 +00:00
{
2020-04-02 08:58:29 +00:00
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.Region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3|cos)([.\-][a-z0-9\-.:]+))");
2020-05-24 10:42:13 +00:00
2020-04-02 08:58:29 +00:00
/// Case when bucket name and key represented in path of S3 URL.
/// E.g. (https://s3.Region.amazonaws.com/bucket-name/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#path-style-access
2020-05-19 10:00:40 +00:00
static const RE2 path_style_pattern("^/([^/]*)/(.*)");
2019-12-06 14:37:21 +00:00
static constexpr auto S3 = "S3";
2020-07-17 11:07:26 +00:00
static constexpr auto COSN = "COSN";
static constexpr auto COS = "COS";
uri = uri_;
storage_name = S3;
2019-12-06 14:37:21 +00:00
if (uri.getHost().empty())
2020-04-02 08:58:29 +00:00
throw Exception("Host is empty in S3 URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
2019-12-06 14:37:21 +00:00
2020-07-17 11:07:26 +00:00
String name;
String endpoint_authority_from_uri;
2019-12-06 14:37:21 +00:00
if (re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri))
{
is_virtual_hosted_style = true;
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;
2020-04-02 08:58:29 +00:00
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
if (bucket.length() < 3 || bucket.length() > 63)
2020-04-02 08:59:10 +00:00
throw Exception(
2020-05-19 10:00:40 +00:00
"Bucket name length is out of bounds in virtual hosted style S3 URI: " + bucket + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
2020-12-14 10:53:40 +00:00
if (!uri.getPath().empty())
{
/// Remove leading '/' from path to extract key.
key = uri.getPath().substr(1);
}
if (key.empty() || key == "/")
2020-05-19 10:00:40 +00:00
throw Exception("Key name is empty in virtual hosted style S3 URI: " + key + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
boost::to_upper(name);
2020-07-17 03:33:29 +00:00
if (name != S3 && name != COS)
{
throw Exception("Object storage system name is unrecognized in virtual hosted style S3 URI: " + name + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
}
2020-07-17 03:33:29 +00:00
if (name == S3)
{
storage_name = name;
2020-07-17 03:33:29 +00:00
}
else
{
storage_name = COSN;
}
}
else if (re2::RE2::PartialMatch(uri.getPath(), path_style_pattern, &bucket, &key))
2019-12-06 14:48:56 +00:00
{
is_virtual_hosted_style = false;
endpoint = uri.getScheme() + "://" + uri.getAuthority();
2020-04-02 08:58:29 +00:00
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
if (bucket.length() < 3 || bucket.length() > 63)
2020-04-02 08:59:10 +00:00
throw Exception(
2020-05-19 10:00:40 +00:00
"Bucket name length is out of bounds in path style S3 URI: " + bucket + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
2019-12-06 14:48:56 +00:00
if (key.empty() || key == "/")
2020-05-19 10:00:40 +00:00
throw Exception("Key name is empty in path style S3 URI: " + key + " (" + uri.toString() + ")", ErrorCodes::BAD_ARGUMENTS);
2019-12-06 14:37:21 +00:00
}
else
2020-04-02 08:58:29 +00:00
throw Exception("Bucket or key name are invalid in S3 URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
2019-12-03 16:23:24 +00:00
}
2019-11-05 07:54:13 +00:00
}
}
2019-12-06 14:37:21 +00:00
#endif