mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Merge remote-tracking branch 'origin/master' into insert-keeper-retries-doc
This commit is contained in:
commit
4cb760c312
@ -52,11 +52,13 @@ template <typename T> T getConfigValueOrDefault(
|
||||
return config.getInt64(path);
|
||||
else if constexpr (std::is_same_v<T, Float64>)
|
||||
return config.getDouble(path);
|
||||
else if constexpr (std::is_same_v<T, bool>)
|
||||
return config.getBool(path);
|
||||
else
|
||||
throw Exception(
|
||||
ErrorCodes::NOT_IMPLEMENTED,
|
||||
"Unsupported type in getConfigValueOrDefault(). "
|
||||
"Supported types are String, UInt64, Int64, Float64");
|
||||
"Supported types are String, UInt64, Int64, Float64, bool");
|
||||
}
|
||||
catch (const Poco::SyntaxException &)
|
||||
{
|
||||
@ -85,11 +87,13 @@ template<typename T> void setConfigValue(
|
||||
config.setInt64(path, value);
|
||||
else if constexpr (std::is_same_v<T, Float64>)
|
||||
config.setDouble(path, value);
|
||||
else if constexpr (std::is_same_v<T, bool>)
|
||||
config.setBool(path, value);
|
||||
else
|
||||
throw Exception(
|
||||
ErrorCodes::NOT_IMPLEMENTED,
|
||||
"Unsupported type in setConfigValue(). "
|
||||
"Supported types are String, UInt64, Int64, Float64");
|
||||
"Supported types are String, UInt64, Int64, Float64, bool");
|
||||
}
|
||||
|
||||
template <typename T> void copyConfigValue(
|
||||
@ -206,6 +210,8 @@ template Int64 getConfigValue<Int64>(const Poco::Util::AbstractConfiguration & c
|
||||
const std::string & path);
|
||||
template Float64 getConfigValue<Float64>(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path);
|
||||
template bool getConfigValue<bool>(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path);
|
||||
|
||||
template String getConfigValueOrDefault<String>(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path, const String * default_value);
|
||||
@ -215,6 +221,8 @@ template Int64 getConfigValueOrDefault<Int64>(const Poco::Util::AbstractConfigur
|
||||
const std::string & path, const Int64 * default_value);
|
||||
template Float64 getConfigValueOrDefault<Float64>(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path, const Float64 * default_value);
|
||||
template bool getConfigValueOrDefault<bool>(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path, const bool * default_value);
|
||||
|
||||
template void setConfigValue<String>(Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path, const String & value, bool update);
|
||||
@ -224,6 +232,8 @@ template void setConfigValue<Int64>(Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path, const Int64 & value, bool update);
|
||||
template void setConfigValue<Float64>(Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path, const Float64 & value, bool update);
|
||||
template void setConfigValue<bool>(Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path, const bool & value, bool update);
|
||||
|
||||
template void copyConfigValue<String>(const Poco::Util::AbstractConfiguration & from_config, const std::string & from_path,
|
||||
Poco::Util::AbstractConfiguration & to_config, const std::string & to_path);
|
||||
@ -233,6 +243,8 @@ template void copyConfigValue<Int64>(const Poco::Util::AbstractConfiguration & f
|
||||
Poco::Util::AbstractConfiguration & to_config, const std::string & to_path);
|
||||
template void copyConfigValue<Float64>(const Poco::Util::AbstractConfiguration & from_config, const std::string & from_path,
|
||||
Poco::Util::AbstractConfiguration & to_config, const std::string & to_path);
|
||||
template void copyConfigValue<bool>(const Poco::Util::AbstractConfiguration & from_config, const std::string & from_path,
|
||||
Poco::Util::AbstractConfiguration & to_config, const std::string & to_path);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -436,11 +436,13 @@ template String NamedCollection::get<String>(const NamedCollection::Key & key) c
|
||||
template UInt64 NamedCollection::get<UInt64>(const NamedCollection::Key & key) const;
|
||||
template Int64 NamedCollection::get<Int64>(const NamedCollection::Key & key) const;
|
||||
template Float64 NamedCollection::get<Float64>(const NamedCollection::Key & key) const;
|
||||
template bool NamedCollection::get<bool>(const NamedCollection::Key & key) const;
|
||||
|
||||
template String NamedCollection::getOrDefault<String>(const NamedCollection::Key & key, const String & default_value) const;
|
||||
template UInt64 NamedCollection::getOrDefault<UInt64>(const NamedCollection::Key & key, const UInt64 & default_value) const;
|
||||
template Int64 NamedCollection::getOrDefault<Int64>(const NamedCollection::Key & key, const Int64 & default_value) const;
|
||||
template Float64 NamedCollection::getOrDefault<Float64>(const NamedCollection::Key & key, const Float64 & default_value) const;
|
||||
template bool NamedCollection::getOrDefault<bool>(const NamedCollection::Key & key, const bool & default_value) const;
|
||||
|
||||
template void NamedCollection::set<String, true>(const NamedCollection::Key & key, const String & value);
|
||||
template void NamedCollection::set<String, false>(const NamedCollection::Key & key, const String & value);
|
||||
@ -450,6 +452,7 @@ template void NamedCollection::set<Int64, true>(const NamedCollection::Key & key
|
||||
template void NamedCollection::set<Int64, false>(const NamedCollection::Key & key, const Int64 & value);
|
||||
template void NamedCollection::set<Float64, true>(const NamedCollection::Key & key, const Float64 & value);
|
||||
template void NamedCollection::set<Float64, false>(const NamedCollection::Key & key, const Float64 & value);
|
||||
template void NamedCollection::set<bool, false>(const NamedCollection::Key & key, const bool & value);
|
||||
|
||||
template void NamedCollection::setOrUpdate<String, true>(const NamedCollection::Key & key, const String & value);
|
||||
template void NamedCollection::setOrUpdate<String, false>(const NamedCollection::Key & key, const String & value);
|
||||
@ -459,6 +462,7 @@ template void NamedCollection::setOrUpdate<Int64, true>(const NamedCollection::K
|
||||
template void NamedCollection::setOrUpdate<Int64, false>(const NamedCollection::Key & key, const Int64 & value);
|
||||
template void NamedCollection::setOrUpdate<Float64, true>(const NamedCollection::Key & key, const Float64 & value);
|
||||
template void NamedCollection::setOrUpdate<Float64, false>(const NamedCollection::Key & key, const Float64 & value);
|
||||
template void NamedCollection::setOrUpdate<bool, false>(const NamedCollection::Key & key, const bool & value);
|
||||
|
||||
template void NamedCollection::remove<true>(const Key & key);
|
||||
template void NamedCollection::remove<false>(const Key & key);
|
||||
|
@ -326,7 +326,6 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(S3ListObjects, "Number of S3 API ListObjects calls.") \
|
||||
M(S3HeadObject, "Number of S3 API HeadObject calls.") \
|
||||
M(S3GetObjectAttributes, "Number of S3 API GetObjectAttributes calls.") \
|
||||
M(S3GetObjectMetadata, "Number of S3 API GetObject calls for getting metadata.") \
|
||||
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.") \
|
||||
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.") \
|
||||
M(S3UploadPart, "Number of S3 API UploadPart calls.") \
|
||||
@ -340,7 +339,6 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
|
||||
M(DiskS3HeadObject, "Number of DiskS3 API HeadObject calls.") \
|
||||
M(DiskS3GetObjectAttributes, "Number of DiskS3 API GetObjectAttributes calls.") \
|
||||
M(DiskS3GetObjectMetadata, "Number of DiskS3 API GetObject calls for getting metadata.") \
|
||||
M(DiskS3CreateMultipartUpload, "Number of DiskS3 API CreateMultipartUpload calls.") \
|
||||
M(DiskS3UploadPartCopy, "Number of DiskS3 API UploadPartCopy calls.") \
|
||||
M(DiskS3UploadPart, "Number of DiskS3 API UploadPart calls.") \
|
||||
|
@ -6,7 +6,7 @@
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
#include <IO/S3Common.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/SeekAvoidingReadBuffer.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
#include <IO/S3/copyS3File.h>
|
||||
#include <Interpreters/threadPoolCallbackRunner.h>
|
||||
#include <Disks/ObjectStorages/S3/diskSettings.h>
|
||||
@ -109,7 +110,8 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path
|
||||
|
||||
bool S3ObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
return S3::objectExists(*client.get(), bucket, object.absolute_path, {}, /* for_disk_s3= */ true);
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return S3::objectExists(*client.get(), bucket, object.absolute_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
@ -384,12 +386,13 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
|
||||
|
||||
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
|
||||
{
|
||||
ObjectMetadata result;
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
|
||||
|
||||
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, /* for_disk_s3= */ true);
|
||||
ObjectMetadata result;
|
||||
result.size_bytes = object_info.size;
|
||||
result.last_modified = object_info.last_modification_time;
|
||||
result.attributes = S3::getObjectMetadata(*client.get(), bucket, path, {}, /* for_disk_s3= */ true);
|
||||
result.attributes = object_info.metadata;
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -404,8 +407,8 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.absolute_path, {}, /* for_disk_s3= */ true);
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.absolute_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
copyS3File(client_ptr, bucket, object_from.absolute_path, 0, size, dest_s3->bucket, object_to.absolute_path,
|
||||
settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true);
|
||||
@ -420,8 +423,8 @@ void S3ObjectStorage::copyObject( // NOLINT
|
||||
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.absolute_path, {}, /* for_disk_s3= */ true);
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.absolute_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
copyS3File(client_ptr, bucket, object_from.absolute_path, 0, size, bucket, object_to.absolute_path,
|
||||
settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true);
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
#include <IO/ReadBufferFromIStream.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
@ -253,7 +254,7 @@ size_t ReadBufferFromS3::getFileSize()
|
||||
if (file_size)
|
||||
return *file_size;
|
||||
|
||||
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, /* for_disk_s3= */ read_settings.for_object_storage);
|
||||
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, request_settings, /* for_disk_s3= */ read_settings.for_object_storage);
|
||||
|
||||
file_size = object_size;
|
||||
return *file_size;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <IO/LimitSeekableReadBuffer.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <IO/StdStreamFromReadBuffer.h>
|
||||
|
||||
@ -64,9 +65,8 @@ namespace
|
||||
: client_ptr(client_ptr_)
|
||||
, dest_bucket(dest_bucket_)
|
||||
, dest_key(dest_key_)
|
||||
, settings(request_settings_.getUploadSettings())
|
||||
, check_objects_after_upload(request_settings_.check_objects_after_upload)
|
||||
, max_unexpected_write_error_retries(request_settings_.max_unexpected_write_error_retries)
|
||||
, request_settings(request_settings_)
|
||||
, upload_settings(request_settings.getUploadSettings())
|
||||
, object_metadata(object_metadata_)
|
||||
, schedule(schedule_)
|
||||
, for_disk_s3(for_disk_s3_)
|
||||
@ -80,9 +80,8 @@ namespace
|
||||
std::shared_ptr<const Aws::S3::S3Client> client_ptr;
|
||||
const String & dest_bucket;
|
||||
const String & dest_key;
|
||||
const S3Settings::RequestSettings::PartUploadSettings & settings;
|
||||
bool check_objects_after_upload;
|
||||
size_t max_unexpected_write_error_retries;
|
||||
const S3Settings::RequestSettings & request_settings;
|
||||
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
|
||||
const std::optional<std::map<String, String>> & object_metadata;
|
||||
ThreadPoolCallbackRunner<void> schedule;
|
||||
bool for_disk_s3;
|
||||
@ -119,8 +118,9 @@ namespace
|
||||
if (object_metadata.has_value())
|
||||
request.SetMetadata(object_metadata.value());
|
||||
|
||||
if (!settings.storage_class_name.empty())
|
||||
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(settings.storage_class_name));
|
||||
const auto & storage_class_name = upload_settings.storage_class_name;
|
||||
if (!storage_class_name.empty())
|
||||
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload);
|
||||
if (for_disk_s3)
|
||||
@ -161,7 +161,7 @@ namespace
|
||||
|
||||
request.SetMultipartUpload(multipart_upload);
|
||||
|
||||
size_t max_retries = std::max(max_unexpected_write_error_retries, 1UL);
|
||||
size_t max_retries = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t retries = 1;; ++retries)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
|
||||
@ -205,7 +205,7 @@ namespace
|
||||
void checkObjectAfterUpload()
|
||||
{
|
||||
LOG_TRACE(log, "Checking object {} exists after upload", dest_key);
|
||||
S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, {}, "Immediately after upload");
|
||||
S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, request_settings, {}, "Immediately after upload");
|
||||
LOG_TRACE(log, "Object {} exists after upload", dest_key);
|
||||
}
|
||||
|
||||
@ -239,47 +239,49 @@ namespace
|
||||
if (!total_size)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
|
||||
|
||||
if (!settings.max_part_number)
|
||||
auto max_part_number = upload_settings.max_part_number;
|
||||
auto min_upload_part_size = upload_settings.min_upload_part_size;
|
||||
auto max_upload_part_size = upload_settings.max_upload_part_size;
|
||||
|
||||
if (!max_part_number)
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_part_number must not be 0");
|
||||
else if (!settings.min_upload_part_size)
|
||||
else if (!min_upload_part_size)
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "min_upload_part_size must not be 0");
|
||||
else if (settings.max_upload_part_size < settings.min_upload_part_size)
|
||||
else if (max_upload_part_size < min_upload_part_size)
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be less than min_upload_part_size");
|
||||
|
||||
size_t part_size = settings.min_upload_part_size;
|
||||
size_t part_size = min_upload_part_size;
|
||||
size_t num_parts = (total_size + part_size - 1) / part_size;
|
||||
|
||||
if (num_parts > settings.max_part_number)
|
||||
if (num_parts > max_part_number)
|
||||
{
|
||||
part_size = (total_size + settings.max_part_number - 1) / settings.max_part_number;
|
||||
part_size = (total_size + max_part_number - 1) / max_part_number;
|
||||
num_parts = (total_size + part_size - 1) / part_size;
|
||||
}
|
||||
|
||||
if (part_size > settings.max_upload_part_size)
|
||||
if (part_size > max_upload_part_size)
|
||||
{
|
||||
part_size = settings.max_upload_part_size;
|
||||
part_size = max_upload_part_size;
|
||||
num_parts = (total_size + part_size - 1) / part_size;
|
||||
}
|
||||
|
||||
if (num_parts < 1 || num_parts > settings.max_part_number || part_size < settings.min_upload_part_size
|
||||
|| part_size > settings.max_upload_part_size)
|
||||
if (num_parts < 1 || num_parts > max_part_number || part_size < min_upload_part_size || part_size > max_upload_part_size)
|
||||
{
|
||||
String msg;
|
||||
if (num_parts < 1)
|
||||
msg = "Number of parts is zero";
|
||||
else if (num_parts > settings.max_part_number)
|
||||
msg = fmt::format("Number of parts exceeds {}", num_parts, settings.max_part_number);
|
||||
else if (part_size < settings.min_upload_part_size)
|
||||
msg = fmt::format("Size of a part is less than {}", part_size, settings.min_upload_part_size);
|
||||
else if (num_parts > max_part_number)
|
||||
msg = fmt::format("Number of parts exceeds {}", num_parts, max_part_number);
|
||||
else if (part_size < min_upload_part_size)
|
||||
msg = fmt::format("Size of a part is less than {}", part_size, min_upload_part_size);
|
||||
else
|
||||
msg = fmt::format("Size of a part exceeds {}", part_size, settings.max_upload_part_size);
|
||||
msg = fmt::format("Size of a part exceeds {}", part_size, max_upload_part_size);
|
||||
|
||||
throw Exception(
|
||||
ErrorCodes::INVALID_CONFIG_PARAMETER,
|
||||
"{} while writing {} bytes to S3. Check max_part_number = {}, "
|
||||
"min_upload_part_size = {}, max_upload_part_size = {}, max_single_part_upload_size = {}",
|
||||
msg, total_size, settings.max_part_number, settings.min_upload_part_size,
|
||||
settings.max_upload_part_size, settings.max_single_part_upload_size);
|
||||
"min_upload_part_size = {}, max_upload_part_size = {}",
|
||||
msg, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
|
||||
}
|
||||
|
||||
/// We've calculated the size of a normal part (the final part can be smaller).
|
||||
@ -418,12 +420,12 @@ namespace
|
||||
|
||||
void performCopy()
|
||||
{
|
||||
if (size <= settings.max_single_part_upload_size)
|
||||
if (size <= upload_settings.max_single_part_upload_size)
|
||||
performSinglepartUpload();
|
||||
else
|
||||
performMultipartUpload();
|
||||
|
||||
if (check_objects_after_upload)
|
||||
if (request_settings.check_objects_after_upload)
|
||||
checkObjectAfterUpload();
|
||||
}
|
||||
|
||||
@ -451,8 +453,9 @@ namespace
|
||||
if (object_metadata.has_value())
|
||||
request.SetMetadata(object_metadata.value());
|
||||
|
||||
if (!settings.storage_class_name.empty())
|
||||
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(settings.storage_class_name));
|
||||
const auto & storage_class_name = upload_settings.storage_class_name;
|
||||
if (!storage_class_name.empty())
|
||||
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
|
||||
|
||||
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
|
||||
request.SetContentType("binary/octet-stream");
|
||||
@ -460,7 +463,7 @@ namespace
|
||||
|
||||
void processPutRequest(const Aws::S3::Model::PutObjectRequest & request)
|
||||
{
|
||||
size_t max_retries = std::max(max_unexpected_write_error_retries, 1UL);
|
||||
size_t max_retries = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t retries = 1;; ++retries)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3PutObject);
|
||||
@ -582,12 +585,12 @@ namespace
|
||||
|
||||
void performCopy()
|
||||
{
|
||||
if (size <= settings.max_single_operation_copy_size)
|
||||
if (size <= upload_settings.max_single_operation_copy_size)
|
||||
performSingleOperationCopy();
|
||||
else
|
||||
performMultipartUploadCopy();
|
||||
|
||||
if (check_objects_after_upload)
|
||||
if (request_settings.check_objects_after_upload)
|
||||
checkObjectAfterUpload();
|
||||
}
|
||||
|
||||
@ -616,8 +619,9 @@ namespace
|
||||
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
|
||||
}
|
||||
|
||||
if (!settings.storage_class_name.empty())
|
||||
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(settings.storage_class_name));
|
||||
const auto & storage_class_name = upload_settings.storage_class_name;
|
||||
if (!storage_class_name.empty())
|
||||
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(storage_class_name));
|
||||
|
||||
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
|
||||
request.SetContentType("binary/octet-stream");
|
||||
@ -625,7 +629,7 @@ namespace
|
||||
|
||||
void processCopyRequest(const Aws::S3::Model::CopyObjectRequest & request)
|
||||
{
|
||||
size_t max_retries = std::max(max_unexpected_write_error_retries, 1UL);
|
||||
size_t max_retries = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t retries = 1;; ++retries)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CopyObject);
|
||||
|
218
src/IO/S3/getObjectInfo.cpp
Normal file
218
src/IO/S3/getObjectInfo.cpp
Normal file
@ -0,0 +1,218 @@
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/model/GetObjectAttributesRequest.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int S3_ERROR;
|
||||
}
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event S3GetObject;
|
||||
extern const Event S3GetObjectAttributes;
|
||||
extern const Event S3HeadObject;
|
||||
extern const Event DiskS3GetObject;
|
||||
extern const Event DiskS3GetObjectAttributes;
|
||||
extern const Event DiskS3HeadObject;
|
||||
}
|
||||
|
||||
|
||||
namespace DB::S3
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
Aws::S3::Model::HeadObjectOutcome headObject(
|
||||
const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3HeadObject);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
|
||||
|
||||
Aws::S3::Model::HeadObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
||||
if (!version_id.empty())
|
||||
req.SetVersionId(version_id);
|
||||
|
||||
return client.HeadObject(req);
|
||||
}
|
||||
|
||||
Aws::S3::Model::GetObjectAttributesOutcome getObjectAttributes(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3GetObjectAttributes);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3GetObjectAttributes);
|
||||
|
||||
Aws::S3::Model::GetObjectAttributesRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
||||
if (!version_id.empty())
|
||||
req.SetVersionId(version_id);
|
||||
|
||||
req.SetObjectAttributes({Aws::S3::Model::ObjectAttributes::ObjectSize});
|
||||
|
||||
return client.GetObjectAttributes(req);
|
||||
}
|
||||
|
||||
Aws::S3::Model::GetObjectOutcome getObjectDummy(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3GetObject);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3GetObject);
|
||||
|
||||
Aws::S3::Model::GetObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
||||
if (!version_id.empty())
|
||||
req.SetVersionId(version_id);
|
||||
|
||||
/// Only the first byte will be read.
|
||||
/// We don't need that first byte but the range should be set otherwise the entire object will be read.
|
||||
req.SetRange("bytes=0-0");
|
||||
|
||||
return client.GetObject(req);
|
||||
}
|
||||
|
||||
|
||||
/// Performs a request to get the size and last modification time of an object.
|
||||
/// The function performs either HeadObject or GetObjectAttributes request depending on the endpoint.
|
||||
std::pair<std::optional<ObjectInfo>, Aws::S3::S3Error> tryGetObjectInfo(
|
||||
const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings, bool with_metadata, bool for_disk_s3)
|
||||
{
|
||||
if (request_settings.allow_head_object_request)
|
||||
{
|
||||
auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
|
||||
if (!outcome.IsSuccess())
|
||||
return {std::nullopt, outcome.GetError()};
|
||||
|
||||
const auto & result = outcome.GetResult();
|
||||
ObjectInfo object_info;
|
||||
object_info.size = static_cast<size_t>(result.GetContentLength());
|
||||
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
|
||||
|
||||
if (with_metadata)
|
||||
object_info.metadata = result.GetMetadata();
|
||||
|
||||
return {object_info, {}};
|
||||
}
|
||||
else
|
||||
{
|
||||
ObjectInfo object_info;
|
||||
|
||||
{
|
||||
auto outcome = getObjectAttributes(client, bucket, key, version_id, for_disk_s3);
|
||||
if (!outcome.IsSuccess())
|
||||
return {std::nullopt, outcome.GetError()};
|
||||
|
||||
const auto & result = outcome.GetResult();
|
||||
object_info.size = static_cast<size_t>(result.GetObjectSize());
|
||||
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
|
||||
}
|
||||
|
||||
if (with_metadata)
|
||||
{
|
||||
auto outcome = getObjectDummy(client, bucket, key, version_id, for_disk_s3);
|
||||
if (!outcome.IsSuccess())
|
||||
return {std::nullopt, outcome.GetError()};
|
||||
|
||||
const auto & result = outcome.GetResult();
|
||||
object_info.metadata = result.GetMetadata();
|
||||
}
|
||||
|
||||
return {object_info, {}};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool isNotFoundError(Aws::S3::S3Errors error)
|
||||
{
|
||||
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
|
||||
}
|
||||
|
||||
ObjectInfo getObjectInfo(
|
||||
const Aws::S3::S3Client & client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
bool with_metadata,
|
||||
bool for_disk_s3,
|
||||
bool throw_on_error)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, with_metadata, for_disk_s3);
|
||||
if (object_info)
|
||||
{
|
||||
return *object_info;
|
||||
}
|
||||
else if (throw_on_error)
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::S3_ERROR,
|
||||
"Failed to get object attributes: {}. HTTP response code: {}",
|
||||
error.GetMessage(), static_cast<size_t>(error.GetResponseCode()));
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
size_t getObjectSize(
|
||||
const Aws::S3::S3Client & client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
bool for_disk_s3,
|
||||
bool throw_on_error)
|
||||
{
|
||||
return getObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3, throw_on_error).size;
|
||||
}
|
||||
|
||||
bool objectExists(
|
||||
const Aws::S3::S3Client & client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
bool for_disk_s3)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3);
|
||||
if (object_info)
|
||||
return true;
|
||||
|
||||
if (isNotFoundError(error.GetErrorType()))
|
||||
return false;
|
||||
|
||||
throw S3Exception(error.GetErrorType(),
|
||||
"Failed to check existence of key {} in bucket {}: {}",
|
||||
key, bucket, error.GetMessage());
|
||||
}
|
||||
|
||||
void checkObjectExists(
|
||||
const Aws::S3::S3Client & client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
bool for_disk_s3,
|
||||
std::string_view description)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3);
|
||||
if (object_info)
|
||||
return;
|
||||
throw S3Exception(error.GetErrorType(), "{}Object {} in bucket {} suddenly disappeared: {}",
|
||||
(description.empty() ? "" : (String(description) + ": ")), key, bucket, error.GetMessage());
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
63
src/IO/S3/getObjectInfo.h
Normal file
63
src/IO/S3/getObjectInfo.h
Normal file
@ -0,0 +1,63 @@
|
||||
#pragma once
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <base/types.h>
|
||||
#include <aws/s3/S3Client.h>
|
||||
|
||||
|
||||
namespace DB::S3
|
||||
{
|
||||
|
||||
struct ObjectInfo
|
||||
{
|
||||
size_t size = 0;
|
||||
time_t last_modification_time = 0;
|
||||
|
||||
std::map<String, String> metadata; /// Set only if getObjectInfo() is called with `with_metadata = true`.
|
||||
};
|
||||
|
||||
ObjectInfo getObjectInfo(
|
||||
const Aws::S3::S3Client & client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id = {},
|
||||
const S3Settings::RequestSettings & request_settings = {},
|
||||
bool with_metadata = false,
|
||||
bool for_disk_s3 = false,
|
||||
bool throw_on_error = true);
|
||||
|
||||
size_t getObjectSize(
|
||||
const Aws::S3::S3Client & client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id = {},
|
||||
const S3Settings::RequestSettings & request_settings = {},
|
||||
bool for_disk_s3 = false,
|
||||
bool throw_on_error = true);
|
||||
|
||||
bool objectExists(
|
||||
const Aws::S3::S3Client & client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id = {},
|
||||
const S3Settings::RequestSettings & request_settings = {},
|
||||
bool for_disk_s3 = false);
|
||||
|
||||
/// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message.
|
||||
void checkObjectExists(
|
||||
const Aws::S3::S3Client & client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id = {},
|
||||
const S3Settings::RequestSettings & request_settings = {},
|
||||
bool for_disk_s3 = false,
|
||||
std::string_view description = {});
|
||||
|
||||
bool isNotFoundError(Aws::S3::S3Errors error);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -705,92 +705,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/// Extracts the endpoint from a constructed S3 client.
|
||||
String getEndpoint(const Aws::S3::S3Client & client)
|
||||
{
|
||||
const auto * endpoint_provider = dynamic_cast<const Aws::S3::Endpoint::S3DefaultEpProviderBase *>(const_cast<Aws::S3::S3Client &>(client).accessEndpointProvider().get());
|
||||
if (!endpoint_provider)
|
||||
return {};
|
||||
String endpoint;
|
||||
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
/// Performs a request to get the size and last modification time of an object.
|
||||
/// The function performs either HeadObject or GetObjectAttributes request depending on the endpoint.
|
||||
std::pair<std::optional<DB::S3::ObjectInfo>, Aws::S3::S3Error> tryGetObjectInfo(
|
||||
const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||
{
|
||||
auto endpoint = getEndpoint(client);
|
||||
bool use_get_object_attributes_request = (endpoint.find(".amazonaws.com") != String::npos);
|
||||
|
||||
if (use_get_object_attributes_request)
|
||||
{
|
||||
/// It's better not to use `HeadObject` requests for AWS S3 because they don't work well with the global region.
|
||||
/// Details: `HeadObject` request never returns a response body (even if there is an error) however
|
||||
/// if the request was sent without specifying a region in the endpoint (i.e. for example "https://test.s3.amazonaws.com/mydata.csv"
|
||||
/// instead of "https://test.s3-us-west-2.amazonaws.com/mydata.csv") then that response body is one of the main ways
|
||||
/// to determine the correct region and try to repeat the request again with the correct region.
|
||||
/// For any other request type (`GetObject`, `ListObjects`, etc.) AWS SDK does that because they have response bodies,
|
||||
/// but for `HeadObject` there is no response body so this way doesn't work. That's why we use `GetObjectAttributes` request instead.
|
||||
/// See https://github.com/aws/aws-sdk-cpp/issues/1558 and also the function S3ErrorMarshaller::ExtractRegion() for more information.
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3GetObjectAttributes);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3GetObjectAttributes);
|
||||
|
||||
Aws::S3::Model::GetObjectAttributesRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
||||
if (!version_id.empty())
|
||||
req.SetVersionId(version_id);
|
||||
|
||||
req.SetObjectAttributes({Aws::S3::Model::ObjectAttributes::ObjectSize});
|
||||
|
||||
auto outcome = client.GetObjectAttributes(req);
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
const auto & result = outcome.GetResult();
|
||||
DB::S3::ObjectInfo object_info;
|
||||
object_info.size = static_cast<size_t>(result.GetObjectSize());
|
||||
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
|
||||
return {object_info, {}};
|
||||
}
|
||||
|
||||
return {std::nullopt, outcome.GetError()};
|
||||
}
|
||||
else
|
||||
{
|
||||
/// By default we use `HeadObject` requests.
|
||||
/// We cannot just use `GetObjectAttributes` requests always because some S3 providers (e.g. Minio)
|
||||
/// don't support `GetObjectAttributes` requests.
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3HeadObject);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
|
||||
|
||||
Aws::S3::Model::HeadObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
||||
if (!version_id.empty())
|
||||
req.SetVersionId(version_id);
|
||||
|
||||
auto outcome = client.HeadObject(req);
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
const auto & result = outcome.GetResult();
|
||||
DB::S3::ObjectInfo object_info;
|
||||
object_info.size = static_cast<size_t>(result.GetContentLength());
|
||||
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
|
||||
return {object_info, {}};
|
||||
}
|
||||
|
||||
return {std::nullopt, outcome.GetError()};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -982,88 +896,6 @@ namespace S3
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
|
||||
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
|
||||
}
|
||||
|
||||
bool isNotFoundError(Aws::S3::S3Errors error)
|
||||
{
|
||||
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
|
||||
}
|
||||
|
||||
ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
|
||||
if (object_info)
|
||||
{
|
||||
return *object_info;
|
||||
}
|
||||
else if (throw_on_error)
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::S3_ERROR,
|
||||
"Failed to get object attributes: {}. HTTP response code: {}",
|
||||
error.GetMessage(), static_cast<size_t>(error.GetResponseCode()));
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
|
||||
{
|
||||
return getObjectInfo(client, bucket, key, version_id, for_disk_s3, throw_on_error).size;
|
||||
}
|
||||
|
||||
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
|
||||
if (object_info)
|
||||
return true;
|
||||
|
||||
if (isNotFoundError(error.GetErrorType()))
|
||||
return false;
|
||||
|
||||
throw S3Exception(error.GetErrorType(),
|
||||
"Failed to check existence of key {} in bucket {}: {}",
|
||||
key, bucket, error.GetMessage());
|
||||
}
|
||||
|
||||
void checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, std::string_view description)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, for_disk_s3);
|
||||
if (object_info)
|
||||
return;
|
||||
throw S3Exception(error.GetErrorType(), "{}Object {} in bucket {} suddenly disappeared: {}",
|
||||
(description.empty() ? "" : (String(description) + ": ")), key, bucket, error.GetMessage());
|
||||
}
|
||||
|
||||
std::map<String, String> getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3GetObjectMetadata);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3GetObjectMetadata);
|
||||
|
||||
/// We must not use the `HeadObject` request, see the comment about `HeadObjectRequest` in S3Common.h.
|
||||
|
||||
Aws::S3::Model::GetObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
||||
/// Only the first byte will be read.
|
||||
/// We don't need that first byte but the range should be set otherwise the entire object will be read.
|
||||
req.SetRange("bytes=0-0");
|
||||
|
||||
if (!version_id.empty())
|
||||
req.SetVersionId(version_id);
|
||||
|
||||
auto outcome = client.GetObject(req);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
return outcome.GetResult().GetMetadata();
|
||||
|
||||
if (!throw_on_error)
|
||||
return {};
|
||||
|
||||
const auto & error = outcome.GetError();
|
||||
throw S3Exception(error.GetErrorType(),
|
||||
"Failed to get metadata of key {} in bucket {}: {}",
|
||||
key, bucket, error.GetMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -121,29 +121,6 @@ struct URI
|
||||
static void validateBucket(const String & bucket, const Poco::URI & uri);
|
||||
};
|
||||
|
||||
/// WARNING: Don't use `HeadObjectRequest`! Use the functions below instead.
|
||||
/// For explanation see the comment about `HeadObject` request in the function tryGetObjectInfo().
|
||||
|
||||
struct ObjectInfo
|
||||
{
|
||||
size_t size = 0;
|
||||
time_t last_modification_time = 0;
|
||||
};
|
||||
|
||||
ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
|
||||
|
||||
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
|
||||
|
||||
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
|
||||
|
||||
/// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message.
|
||||
void checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, std::string_view description = {});
|
||||
|
||||
bool isNotFoundError(Aws::S3::S3Errors error);
|
||||
|
||||
/// Returns the object's metadata.
|
||||
std::map<String, String> getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -9,7 +9,7 @@
|
||||
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <aws/s3/S3Client.h>
|
||||
@ -76,7 +76,7 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
std::optional<std::map<String, String>> object_metadata_,
|
||||
size_t buffer_size_,
|
||||
ThreadPoolCallbackRunner<void> schedule_,
|
||||
@ -84,12 +84,11 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
|
||||
, bucket(bucket_)
|
||||
, key(key_)
|
||||
, settings(request_settings.getUploadSettings())
|
||||
, check_objects_after_upload(request_settings.check_objects_after_upload)
|
||||
, max_unexpected_write_error_retries(request_settings.max_unexpected_write_error_retries)
|
||||
, request_settings(request_settings_)
|
||||
, upload_settings(request_settings.getUploadSettings())
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, object_metadata(std::move(object_metadata_))
|
||||
, upload_part_size(settings.min_upload_part_size)
|
||||
, upload_part_size(upload_settings.min_upload_part_size)
|
||||
, schedule(std::move(schedule_))
|
||||
, write_settings(write_settings_)
|
||||
{
|
||||
@ -114,7 +113,7 @@ void WriteBufferFromS3::nextImpl()
|
||||
write_settings.remote_throttler->add(offset(), ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);
|
||||
|
||||
/// Data size exceeds singlepart upload threshold, need to use multipart upload.
|
||||
if (multipart_upload_id.empty() && last_part_size > settings.max_single_part_upload_size)
|
||||
if (multipart_upload_id.empty() && last_part_size > upload_settings.max_single_part_upload_size)
|
||||
createMultipartUpload();
|
||||
|
||||
chassert(upload_part_size > 0);
|
||||
@ -182,10 +181,10 @@ void WriteBufferFromS3::finalizeImpl()
|
||||
if (!multipart_upload_id.empty())
|
||||
completeMultipartUpload();
|
||||
|
||||
if (check_objects_after_upload)
|
||||
if (request_settings.check_objects_after_upload)
|
||||
{
|
||||
LOG_TRACE(log, "Checking object {} exists after upload", key);
|
||||
S3::checkObjectExists(*client_ptr, bucket, key, {}, /* for_disk_s3= */ write_settings.for_object_storage, "Immediately after upload");
|
||||
S3::checkObjectExists(*client_ptr, bucket, key, {}, request_settings, /* for_disk_s3= */ write_settings.for_object_storage, "Immediately after upload");
|
||||
LOG_TRACE(log, "Object {} exists after upload", key);
|
||||
}
|
||||
}
|
||||
@ -303,7 +302,10 @@ void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & re
|
||||
{
|
||||
/// Increase part number.
|
||||
++part_number;
|
||||
if (!multipart_upload_id.empty() && (part_number > settings.max_part_number))
|
||||
|
||||
auto max_part_number = upload_settings.max_part_number;
|
||||
|
||||
if (!multipart_upload_id.empty() && (part_number > max_part_number))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::INVALID_CONFIG_PARAMETER,
|
||||
@ -311,10 +313,11 @@ void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & re
|
||||
"Check min_upload_part_size = {}, max_upload_part_size = {}, "
|
||||
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, "
|
||||
"max_single_part_upload_size = {}",
|
||||
settings.max_part_number, count(), settings.min_upload_part_size, settings.max_upload_part_size,
|
||||
settings.upload_part_size_multiply_factor,
|
||||
settings.upload_part_size_multiply_parts_count_threshold,
|
||||
settings.max_single_part_upload_size);
|
||||
max_part_number, count(),
|
||||
upload_settings.min_upload_part_size, upload_settings.max_upload_part_size,
|
||||
upload_settings.upload_part_size_multiply_factor,
|
||||
upload_settings.upload_part_size_multiply_parts_count_threshold,
|
||||
upload_settings.max_single_part_upload_size);
|
||||
}
|
||||
|
||||
/// Setup request.
|
||||
@ -329,10 +332,13 @@ void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & re
|
||||
req.SetContentType("binary/octet-stream");
|
||||
|
||||
/// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
|
||||
if (!multipart_upload_id.empty() && (part_number % settings.upload_part_size_multiply_parts_count_threshold == 0))
|
||||
auto threshold = upload_settings.upload_part_size_multiply_parts_count_threshold;
|
||||
if (!multipart_upload_id.empty() && (part_number % threshold == 0))
|
||||
{
|
||||
upload_part_size *= settings.upload_part_size_multiply_factor;
|
||||
upload_part_size = std::min(upload_part_size, settings.max_upload_part_size);
|
||||
auto max_upload_part_size = upload_settings.max_upload_part_size;
|
||||
auto upload_part_size_multiply_factor = upload_settings.upload_part_size_multiply_factor;
|
||||
upload_part_size *= upload_part_size_multiply_factor;
|
||||
upload_part_size = std::min(upload_part_size, max_upload_part_size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -377,7 +383,7 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
|
||||
req.SetMultipartUpload(multipart_upload);
|
||||
|
||||
size_t max_retry = std::max(max_unexpected_write_error_retries, 1UL);
|
||||
size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
|
||||
@ -476,8 +482,8 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
|
||||
req.SetBody(temporary_buffer);
|
||||
if (object_metadata.has_value())
|
||||
req.SetMetadata(object_metadata.value());
|
||||
if (!settings.storage_class_name.empty())
|
||||
req.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(settings.storage_class_name));
|
||||
if (!upload_settings.storage_class_name.empty())
|
||||
req.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(upload_settings.storage_class_name));
|
||||
|
||||
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
|
||||
req.SetContentType("binary/octet-stream");
|
||||
@ -485,7 +491,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
|
||||
|
||||
void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
|
||||
{
|
||||
size_t max_retry = std::max(max_unexpected_write_error_retries, 1UL);
|
||||
size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3PutObject);
|
||||
|
@ -50,7 +50,7 @@ public:
|
||||
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
|
||||
const String & bucket_,
|
||||
const String & key_,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
ThreadPoolCallbackRunner<void> schedule_ = {},
|
||||
@ -88,9 +88,8 @@ private:
|
||||
|
||||
const String bucket;
|
||||
const String key;
|
||||
const S3Settings::RequestSettings::PartUploadSettings settings;
|
||||
const bool check_objects_after_upload = false;
|
||||
const size_t max_unexpected_write_error_retries = 4;
|
||||
const S3Settings::RequestSettings request_settings;
|
||||
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
|
||||
const std::shared_ptr<const Aws::S3::S3Client> client_ptr;
|
||||
const std::optional<std::map<String, String>> object_metadata;
|
||||
|
||||
|
@ -416,6 +416,7 @@ public:
|
||||
const std::string & version_id_,
|
||||
const std::vector<String> & keys_,
|
||||
const String & bucket_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
ASTPtr query_,
|
||||
const Block & virtual_header_,
|
||||
ContextPtr context_,
|
||||
@ -469,7 +470,7 @@ public:
|
||||
/// (which means we eventually need this info anyway, so it should be ok to do it now)
|
||||
if (object_infos_)
|
||||
{
|
||||
info = S3::getObjectInfo(client_, bucket, key, version_id_);
|
||||
info = S3::getObjectInfo(client_, bucket, key, version_id_, request_settings_);
|
||||
total_size += info->size;
|
||||
|
||||
String path = fs::path(bucket) / key;
|
||||
@ -510,14 +511,15 @@ StorageS3Source::KeysIterator::KeysIterator(
|
||||
const std::string & version_id_,
|
||||
const std::vector<String> & keys_,
|
||||
const String & bucket_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
ASTPtr query,
|
||||
const Block & virtual_header,
|
||||
ContextPtr context,
|
||||
ObjectInfos * object_infos,
|
||||
Strings * read_keys)
|
||||
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(
|
||||
client_, version_id_, keys_, bucket_, query,
|
||||
virtual_header, context, object_infos, read_keys))
|
||||
client_, version_id_, keys_, bucket_, request_settings_,
|
||||
query, virtual_header, context, object_infos, read_keys))
|
||||
{
|
||||
}
|
||||
|
||||
@ -585,7 +587,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
|
||||
if (current_key.empty())
|
||||
return {};
|
||||
|
||||
size_t object_size = info ? info->size : S3::getObjectSize(*client, bucket, current_key, version_id);
|
||||
size_t object_size = info ? info->size : S3::getObjectSize(*client, bucket, current_key, version_id, request_settings);
|
||||
|
||||
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
|
||||
auto read_buf = wrapReadBufferWithCompressionMethod(
|
||||
@ -1009,7 +1011,7 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
|
||||
{
|
||||
return std::make_shared<StorageS3Source::KeysIterator>(
|
||||
*s3_configuration.client, s3_configuration.uri.version_id, keys,
|
||||
s3_configuration.uri.bucket, query, virtual_block, local_context,
|
||||
s3_configuration.uri.bucket, s3_configuration.request_settings, query, virtual_block, local_context,
|
||||
object_infos, read_keys);
|
||||
}
|
||||
}
|
||||
@ -1144,7 +1146,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
|
||||
|
||||
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
|
||||
|
||||
if (!truncate_in_insert && S3::objectExists(*s3_configuration.client, s3_configuration.uri.bucket, keys.back(), s3_configuration.uri.version_id))
|
||||
if (!truncate_in_insert && S3::objectExists(*s3_configuration.client, s3_configuration.uri.bucket, keys.back(), s3_configuration.uri.version_id, s3_configuration.request_settings))
|
||||
{
|
||||
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
|
||||
{
|
||||
@ -1156,7 +1158,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
|
||||
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
|
||||
++index;
|
||||
}
|
||||
while (S3::objectExists(*s3_configuration.client, s3_configuration.uri.bucket, new_key, s3_configuration.uri.version_id));
|
||||
while (S3::objectExists(*s3_configuration.client, s3_configuration.uri.bucket, new_key, s3_configuration.uri.version_id, s3_configuration.request_settings));
|
||||
keys.push_back(new_key);
|
||||
}
|
||||
else
|
||||
@ -1541,7 +1543,8 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
|
||||
/// Note that in case of exception in getObjectInfo returned info will be empty,
|
||||
/// but schema cache will handle this case and won't return columns from cache
|
||||
/// because we can't say that it's valid without last modification time.
|
||||
info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, {}, /* throw_on_error= */ false);
|
||||
info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, s3_configuration.request_settings,
|
||||
{}, {}, /* throw_on_error= */ false);
|
||||
if (object_infos)
|
||||
(*object_infos)[path] = info;
|
||||
}
|
||||
|
@ -15,7 +15,7 @@
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Poco/URI.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
#include <IO/CompressionMethod.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/threadPoolCallbackRunner.h>
|
||||
@ -92,6 +92,7 @@ public:
|
||||
const std::string & version_id_,
|
||||
const std::vector<String> & keys_,
|
||||
const String & bucket_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
ASTPtr query,
|
||||
const Block & virtual_header,
|
||||
ContextPtr context,
|
||||
|
@ -166,6 +166,7 @@ S3Settings::RequestSettings::RequestSettings(const NamedCollection & collection)
|
||||
max_single_read_retries = collection.getOrDefault<UInt64>("max_single_read_retries", max_single_read_retries);
|
||||
max_connections = collection.getOrDefault<UInt64>("max_connections", max_connections);
|
||||
list_object_keys_size = collection.getOrDefault<UInt64>("list_object_keys_size", list_object_keys_size);
|
||||
allow_head_object_request = collection.getOrDefault<bool>("allow_head_object_request", allow_head_object_request);
|
||||
}
|
||||
|
||||
S3Settings::RequestSettings::RequestSettings(
|
||||
@ -180,6 +181,7 @@ S3Settings::RequestSettings::RequestSettings(
|
||||
max_connections = config.getUInt64(key + "max_connections", settings.s3_max_connections);
|
||||
check_objects_after_upload = config.getBool(key + "check_objects_after_upload", settings.s3_check_objects_after_upload);
|
||||
list_object_keys_size = config.getUInt64(key + "list_object_keys_size", settings.s3_list_object_keys_size);
|
||||
allow_head_object_request = config.getBool(key + "allow_head_object_request", allow_head_object_request);
|
||||
|
||||
/// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload,
|
||||
/// which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
|
||||
|
@ -67,6 +67,16 @@ struct S3Settings
|
||||
ThrottlerPtr get_request_throttler;
|
||||
ThrottlerPtr put_request_throttler;
|
||||
|
||||
/// If this is set to false then `S3::getObjectSize()` will use `GetObjectAttributes` request instead of `HeadObject`.
|
||||
/// Details: `HeadObject` request never returns a response body (even if there is an error) however
|
||||
/// if the request was sent without specifying a region in the endpoint (i.e. for example "https://test.s3.amazonaws.com/mydata.csv"
|
||||
/// instead of "https://test.s3-us-west-2.amazonaws.com/mydata.csv") then that response body is one of the main ways to determine
|
||||
/// the correct region and try to repeat the request again with the correct region.
|
||||
/// For any other request type (`GetObject`, `ListObjects`, etc.) AWS SDK does that because they have response bodies, but for `HeadObject`
|
||||
/// there is no response body so this way doesn't work. That's why it's better to use `GetObjectAttributes` requests sometimes.
|
||||
/// See https://github.com/aws/aws-sdk-cpp/issues/1558 and also the function S3ErrorMarshaller::ExtractRegion() for more information.
|
||||
bool allow_head_object_request = true;
|
||||
|
||||
const PartUploadSettings & getUploadSettings() const { return upload_settings; }
|
||||
|
||||
RequestSettings() = default;
|
||||
|
Loading…
Reference in New Issue
Block a user