mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
Merge pull request #61108 from ClickHouse/chesema-no-such-key
distinct message for s3 error 'no key' for cases disk and storage
This commit is contained in:
commit
b821698463
@ -14,6 +14,7 @@
|
||||
#include <Disks/DiskType.h>
|
||||
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <azure/storage/blobs/blob_options.hpp>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
@ -38,6 +39,8 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
|
||||
, configuration(configuration_)
|
||||
{
|
||||
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
||||
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});
|
||||
|
||||
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
|
||||
std::move(client_ptr),
|
||||
StorageAzureBlob::createSettings(context_),
|
||||
@ -97,8 +100,7 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
|
||||
/* dest_path */ blob_path[0],
|
||||
settings,
|
||||
read_settings,
|
||||
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupRDAzure"),
|
||||
/* for_disk_azure_blob_storage= */ true);
|
||||
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupRDAzure"));
|
||||
|
||||
return file_size;
|
||||
};
|
||||
@ -123,6 +125,8 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
|
||||
, configuration(configuration_)
|
||||
{
|
||||
auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false, attempt_to_create_container);
|
||||
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});
|
||||
|
||||
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
|
||||
std::move(client_ptr),
|
||||
StorageAzureBlob::createSettings(context_),
|
||||
@ -177,8 +181,7 @@ void BackupWriterAzureBlobStorage::copyFile(const String & destination, const St
|
||||
/* dest_path */ destination,
|
||||
settings,
|
||||
read_settings,
|
||||
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"),
|
||||
/* for_disk_azure_blob_storage= */ true);
|
||||
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
|
||||
}
|
||||
|
||||
void BackupWriterAzureBlobStorage::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
|
||||
|
@ -225,7 +225,7 @@ void ReadBufferFromAzureBlobStorage::initialize()
|
||||
try
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AzureGetObject);
|
||||
if (read_settings.for_object_storage)
|
||||
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureGetObject);
|
||||
|
||||
auto download_response = blob_client->Download(download_options);
|
||||
@ -279,7 +279,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran
|
||||
try
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AzureGetObject);
|
||||
if (read_settings.for_object_storage)
|
||||
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureGetObject);
|
||||
|
||||
Azure::Storage::Blobs::DownloadBlobOptions download_options;
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/re2.h>
|
||||
#include <azure/identity/managed_identity_credential.hpp>
|
||||
#include <azure/storage/blobs/blob_options.hpp>
|
||||
#include <azure/core/http/curl_transport.hpp>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Interpreters/Context.h>
|
||||
@ -206,6 +207,8 @@ Azure::Storage::Blobs::BlobClientOptions getAzureBlobClientOptions(const Poco::U
|
||||
client_options.Retry = retry_options;
|
||||
client_options.Transport.Transport = std::make_shared<Azure::Core::Http::CurlTransport>(curl_options);
|
||||
|
||||
client_options.ClickhouseOptions = Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true};
|
||||
|
||||
return client_options;
|
||||
}
|
||||
|
||||
|
@ -69,6 +69,7 @@ private:
|
||||
bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AzureListObjects);
|
||||
if (client->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
|
||||
|
||||
batch.clear();
|
||||
@ -130,6 +131,7 @@ bool AzureObjectStorage::exists(const StoredObject & object) const
|
||||
options.PageSizeHint = 1;
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::AzureListObjects);
|
||||
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
|
||||
|
||||
auto blobs_list_response = client_ptr->ListBlobs(options);
|
||||
@ -169,6 +171,7 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
|
||||
while (true)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AzureListObjects);
|
||||
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
|
||||
|
||||
blob_list_response = client_ptr->ListBlobs(options);
|
||||
@ -298,6 +301,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
|
||||
void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const SharedAzureClientPtr & client_ptr, bool if_exists)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AzureDeleteObjects);
|
||||
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureDeleteObjects);
|
||||
|
||||
const auto & path = object.remote_path;
|
||||
@ -353,13 +357,14 @@ void AzureObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
|
||||
|
||||
ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) const
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
|
||||
|
||||
auto client_ptr = client.get();
|
||||
auto blob_client = client_ptr->GetBlobClient(path);
|
||||
auto properties = blob_client.GetProperties().Value;
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
|
||||
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
|
||||
|
||||
ObjectMetadata result;
|
||||
result.size_bytes = properties.BlobSize;
|
||||
if (!properties.Metadata.empty())
|
||||
@ -391,6 +396,7 @@ void AzureObjectStorage::copyObject( /// NOLINT
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
|
||||
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
|
||||
|
||||
dest_blob_client.CopyFromUri(source_blob_client.GetUrl(), copy_options);
|
||||
|
@ -84,16 +84,12 @@ const std::string & IObjectStorage::getCacheName() const
|
||||
|
||||
ReadSettings IObjectStorage::patchSettings(const ReadSettings & read_settings) const
|
||||
{
|
||||
ReadSettings settings{read_settings};
|
||||
settings.for_object_storage = true;
|
||||
return settings;
|
||||
return read_settings;
|
||||
}
|
||||
|
||||
WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings) const
|
||||
{
|
||||
WriteSettings settings{write_settings};
|
||||
settings.for_object_storage = true;
|
||||
return settings;
|
||||
return write_settings;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ private:
|
||||
bool S3ObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {}, settings_ptr->request_settings);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
@ -425,7 +425,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
|
||||
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
|
||||
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* throw_on_error= */ false);
|
||||
|
||||
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
|
||||
return {};
|
||||
@ -441,7 +441,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
|
||||
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
|
||||
auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true);
|
||||
|
||||
ObjectMetadata result;
|
||||
result.size_bytes = object_info.size;
|
||||
@ -464,9 +464,11 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
{
|
||||
auto current_client = dest_s3->client.get();
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings);
|
||||
auto scheduler = threadPoolCallbackRunnerUnsafe<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
try {
|
||||
|
||||
try
|
||||
{
|
||||
copyS3File(
|
||||
current_client,
|
||||
uri.bucket,
|
||||
@ -479,8 +481,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
patchSettings(read_settings),
|
||||
BlobStorageLogWriter::create(disk_name),
|
||||
object_to_attributes,
|
||||
scheduler,
|
||||
/* for_disk_s3= */ true);
|
||||
scheduler);
|
||||
return;
|
||||
}
|
||||
catch (S3Exception & exc)
|
||||
@ -506,8 +507,9 @@ void S3ObjectStorage::copyObject( // NOLINT
|
||||
{
|
||||
auto current_client = client.get();
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
|
||||
auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings);
|
||||
auto scheduler = threadPoolCallbackRunnerUnsafe<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
|
||||
copyS3File(current_client,
|
||||
uri.bucket,
|
||||
object_from.remote_path,
|
||||
@ -519,8 +521,7 @@ void S3ObjectStorage::copyObject( // NOLINT
|
||||
patchSettings(read_settings),
|
||||
BlobStorageLogWriter::create(disk_name),
|
||||
object_to_attributes,
|
||||
scheduler,
|
||||
/* for_disk_s3= */ true);
|
||||
scheduler);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_)
|
||||
|
@ -46,7 +46,6 @@ namespace
|
||||
const String & dest_blob_,
|
||||
std::shared_ptr<const AzureObjectStorageSettings> settings_,
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
|
||||
bool for_disk_azure_blob_storage_,
|
||||
const Poco::Logger * log_)
|
||||
: create_read_buffer(create_read_buffer_)
|
||||
, client(client_)
|
||||
@ -56,7 +55,6 @@ namespace
|
||||
, dest_blob(dest_blob_)
|
||||
, settings(settings_)
|
||||
, schedule(schedule_)
|
||||
, for_disk_azure_blob_storage(for_disk_azure_blob_storage_)
|
||||
, log(log_)
|
||||
, max_single_part_upload_size(settings_->max_single_part_upload_size)
|
||||
{
|
||||
@ -73,7 +71,6 @@ namespace
|
||||
const String & dest_blob;
|
||||
std::shared_ptr<const AzureObjectStorageSettings> settings;
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule;
|
||||
bool for_disk_azure_blob_storage;
|
||||
const Poco::Logger * log;
|
||||
size_t max_single_part_upload_size;
|
||||
|
||||
@ -217,7 +214,7 @@ namespace
|
||||
void processUploadPartRequest(UploadPartTask & task)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AzureUploadPart);
|
||||
if (for_disk_azure_blob_storage)
|
||||
if (client->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureUploadPart);
|
||||
|
||||
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
|
||||
@ -269,10 +266,9 @@ void copyDataToAzureBlobStorageFile(
|
||||
const String & dest_container_for_logging,
|
||||
const String & dest_blob,
|
||||
std::shared_ptr<const AzureObjectStorageSettings> settings,
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule,
|
||||
bool for_disk_azure_blob_storage)
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule)
|
||||
{
|
||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyDataToAzureBlobStorageFile")};
|
||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyDataToAzureBlobStorageFile")};
|
||||
helper.performCopy();
|
||||
}
|
||||
|
||||
@ -288,14 +284,13 @@ void copyAzureBlobStorageFile(
|
||||
const String & dest_blob,
|
||||
std::shared_ptr<const AzureObjectStorageSettings> settings,
|
||||
const ReadSettings & read_settings,
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule,
|
||||
bool for_disk_azure_blob_storage)
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule)
|
||||
{
|
||||
|
||||
if (settings->use_native_copy)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
|
||||
if (for_disk_azure_blob_storage)
|
||||
if (dest_client->GetClickhouseOptions().IsClientForDisk)
|
||||
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
|
||||
|
||||
auto block_blob_client_src = src_client->GetBlockBlobClient(src_blob);
|
||||
@ -330,7 +325,7 @@ void copyAzureBlobStorageFile(
|
||||
settings->max_single_download_retries);
|
||||
};
|
||||
|
||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyAzureBlobStorageFile")};
|
||||
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")};
|
||||
helper.performCopy();
|
||||
}
|
||||
}
|
||||
|
@ -31,8 +31,7 @@ void copyAzureBlobStorageFile(
|
||||
const String & dest_blob,
|
||||
std::shared_ptr<const AzureObjectStorageSettings> settings,
|
||||
const ReadSettings & read_settings,
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {},
|
||||
bool for_disk_azure_blob_storage = false);
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {});
|
||||
|
||||
|
||||
/// Copies data from any seekable source to AzureBlobStorage.
|
||||
@ -48,8 +47,7 @@ void copyDataToAzureBlobStorageFile(
|
||||
const String & dest_container_for_logging,
|
||||
const String & dest_blob,
|
||||
std::shared_ptr<const AzureObjectStorageSettings> settings,
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {},
|
||||
bool for_disk_azure_blob_storage = false);
|
||||
ThreadPoolCallbackRunnerUnsafe<void> schedule_ = {});
|
||||
|
||||
}
|
||||
|
||||
|
@ -314,7 +314,7 @@ size_t ReadBufferFromS3::getFileSize()
|
||||
if (file_size)
|
||||
return *file_size;
|
||||
|
||||
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, request_settings, /* for_disk_s3= */ read_settings.for_object_storage);
|
||||
auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, request_settings);
|
||||
|
||||
file_size = object_size;
|
||||
return *file_size;
|
||||
@ -415,7 +415,7 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3GetObject);
|
||||
if (read_settings.for_object_storage)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3GetObject);
|
||||
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ReadBufferFromS3InitMicroseconds);
|
||||
|
@ -127,9 +127,6 @@ struct ReadSettings
|
||||
bool http_skip_not_found_url_for_globs = true;
|
||||
bool http_make_head_request = true;
|
||||
|
||||
/// Monitoring
|
||||
bool for_object_storage = false; // to choose which profile events should be incremented
|
||||
|
||||
ReadSettings adjustBufferSize(size_t file_size) const
|
||||
{
|
||||
ReadSettings res = *this;
|
||||
|
@ -384,7 +384,8 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
|
||||
|
||||
/// The next call is NOT a recurcive call
|
||||
/// This is a virtuall call Aws::S3::S3Client::HeadObject(const Model::HeadObjectRequest&)
|
||||
return HeadObject(static_cast<const Model::HeadObjectRequest&>(request));
|
||||
return enrichErrorMessage(
|
||||
HeadObject(static_cast<const Model::HeadObjectRequest&>(request)));
|
||||
}
|
||||
|
||||
/// For each request, we wrap the request functions from Aws::S3::Client with doRequest
|
||||
@ -404,7 +405,8 @@ Model::ListObjectsOutcome Client::ListObjects(ListObjectsRequest & request) cons
|
||||
|
||||
Model::GetObjectOutcome Client::GetObject(GetObjectRequest & request) const
|
||||
{
|
||||
return doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); });
|
||||
return enrichErrorMessage(
|
||||
doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); }));
|
||||
}
|
||||
|
||||
Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(AbortMultipartUploadRequest & request) const
|
||||
@ -652,14 +654,14 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
|
||||
|
||||
if constexpr (IsReadMethod)
|
||||
{
|
||||
if (client_configuration.for_disk_s3)
|
||||
if (isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3ReadRequestsErrors);
|
||||
else
|
||||
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (client_configuration.for_disk_s3)
|
||||
if (isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3WriteRequestsErrors);
|
||||
else
|
||||
ProfileEvents::increment(ProfileEvents::S3WriteRequestsErrors);
|
||||
@ -689,6 +691,23 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request
|
||||
return doRequest(request, with_retries);
|
||||
}
|
||||
|
||||
template <typename RequestResult>
|
||||
RequestResult Client::enrichErrorMessage(RequestResult && outcome) const
|
||||
{
|
||||
if (outcome.IsSuccess() || !isClientForDisk())
|
||||
return std::forward<RequestResult>(outcome);
|
||||
|
||||
String enriched_message = fmt::format(
|
||||
"{} {}",
|
||||
outcome.GetError().GetMessage(),
|
||||
"This error happened for S3 disk.");
|
||||
|
||||
auto error = outcome.GetError();
|
||||
error.SetMessage(enriched_message);
|
||||
|
||||
return RequestResult(error);
|
||||
}
|
||||
|
||||
bool Client::supportsMultiPartCopy() const
|
||||
{
|
||||
return provider_type != ProviderType::GCS;
|
||||
|
@ -214,6 +214,11 @@ public:
|
||||
|
||||
bool isS3ExpressBucket() const { return client_settings.is_s3express_bucket; }
|
||||
|
||||
bool isClientForDisk() const
|
||||
{
|
||||
return client_configuration.for_disk_s3;
|
||||
}
|
||||
|
||||
private:
|
||||
friend struct ::MockS3::Client;
|
||||
|
||||
@ -265,6 +270,9 @@ private:
|
||||
bool checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const;
|
||||
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
|
||||
|
||||
template <typename RequestResult>
|
||||
RequestResult enrichErrorMessage(RequestResult && outcome) const;
|
||||
|
||||
String initial_endpoint;
|
||||
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
|
||||
PocoHTTPClientConfiguration client_configuration;
|
||||
|
@ -140,7 +140,7 @@ namespace
|
||||
fillCreateMultipartRequest(request);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload);
|
||||
if (for_disk_s3)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
|
||||
|
||||
auto outcome = client_ptr->CreateMultipartUpload(request);
|
||||
@ -189,7 +189,7 @@ namespace
|
||||
for (size_t retries = 1;; ++retries)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
|
||||
if (for_disk_s3)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
|
||||
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(request);
|
||||
@ -239,7 +239,7 @@ namespace
|
||||
void checkObjectAfterUpload()
|
||||
{
|
||||
LOG_TRACE(log, "Checking object {} exists after upload", dest_key);
|
||||
S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, request_settings, {}, "Immediately after upload");
|
||||
S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, request_settings, "Immediately after upload");
|
||||
LOG_TRACE(log, "Object {} exists after upload", dest_key);
|
||||
}
|
||||
|
||||
@ -528,7 +528,7 @@ namespace
|
||||
for (size_t retries = 1;; ++retries)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3PutObject);
|
||||
if (for_disk_s3)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3PutObject);
|
||||
|
||||
Stopwatch watch;
|
||||
@ -615,7 +615,7 @@ namespace
|
||||
auto & req = typeid_cast<S3::UploadPartRequest &>(request);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3UploadPart);
|
||||
if (for_disk_s3)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3UploadPart);
|
||||
|
||||
auto outcome = client_ptr->UploadPart(req);
|
||||
@ -726,7 +726,7 @@ namespace
|
||||
for (size_t retries = 1;; ++retries)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CopyObject);
|
||||
if (for_disk_s3)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CopyObject);
|
||||
|
||||
auto outcome = client_ptr->CopyObject(request);
|
||||
@ -830,7 +830,7 @@ namespace
|
||||
auto & req = typeid_cast<S3::UploadPartCopyRequest &>(request);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3UploadPartCopy);
|
||||
if (for_disk_s3)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3UploadPartCopy);
|
||||
|
||||
auto outcome = client_ptr->UploadPartCopy(req);
|
||||
|
@ -25,10 +25,10 @@ namespace DB::S3
|
||||
namespace
|
||||
{
|
||||
Aws::S3::Model::HeadObjectOutcome headObject(
|
||||
const S3::Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||
const S3::Client & client, const String & bucket, const String & key, const String & version_id)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3HeadObject);
|
||||
if (for_disk_s3)
|
||||
if (client.isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
|
||||
|
||||
S3::HeadObjectRequest req;
|
||||
@ -44,9 +44,9 @@ namespace
|
||||
/// Performs a request to get the size and last modification time of an object.
|
||||
std::pair<std::optional<ObjectInfo>, Aws::S3::S3Error> tryGetObjectInfo(
|
||||
const S3::Client & client, const String & bucket, const String & key, const String & version_id,
|
||||
const S3Settings::RequestSettings & /*request_settings*/, bool with_metadata, bool for_disk_s3)
|
||||
const S3Settings::RequestSettings & /*request_settings*/, bool with_metadata)
|
||||
{
|
||||
auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
|
||||
auto outcome = headObject(client, bucket, key, version_id);
|
||||
if (!outcome.IsSuccess())
|
||||
return {std::nullopt, outcome.GetError()};
|
||||
|
||||
@ -75,10 +75,9 @@ ObjectInfo getObjectInfo(
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
bool with_metadata,
|
||||
bool for_disk_s3,
|
||||
bool throw_on_error)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, with_metadata, for_disk_s3);
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, with_metadata);
|
||||
if (object_info)
|
||||
{
|
||||
return *object_info;
|
||||
@ -98,10 +97,9 @@ size_t getObjectSize(
|
||||
const String & key,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
bool for_disk_s3,
|
||||
bool throw_on_error)
|
||||
{
|
||||
return getObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3, throw_on_error).size;
|
||||
return getObjectInfo(client, bucket, key, version_id, request_settings, {}, throw_on_error).size;
|
||||
}
|
||||
|
||||
bool objectExists(
|
||||
@ -109,10 +107,9 @@ bool objectExists(
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
bool for_disk_s3)
|
||||
const S3Settings::RequestSettings & request_settings)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3);
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {});
|
||||
if (object_info)
|
||||
return true;
|
||||
|
||||
@ -130,10 +127,9 @@ void checkObjectExists(
|
||||
const String & key,
|
||||
const String & version_id,
|
||||
const S3Settings::RequestSettings & request_settings,
|
||||
bool for_disk_s3,
|
||||
std::string_view description)
|
||||
{
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3);
|
||||
auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {});
|
||||
if (object_info)
|
||||
return;
|
||||
throw S3Exception(error.GetErrorType(), "{}Object {} in bucket {} suddenly disappeared: {}",
|
||||
|
@ -26,7 +26,6 @@ ObjectInfo getObjectInfo(
|
||||
const String & version_id = {},
|
||||
const S3Settings::RequestSettings & request_settings = {},
|
||||
bool with_metadata = false,
|
||||
bool for_disk_s3 = false,
|
||||
bool throw_on_error = true);
|
||||
|
||||
size_t getObjectSize(
|
||||
@ -35,7 +34,6 @@ size_t getObjectSize(
|
||||
const String & key,
|
||||
const String & version_id = {},
|
||||
const S3Settings::RequestSettings & request_settings = {},
|
||||
bool for_disk_s3 = false,
|
||||
bool throw_on_error = true);
|
||||
|
||||
bool objectExists(
|
||||
@ -43,8 +41,7 @@ bool objectExists(
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
const String & version_id = {},
|
||||
const S3Settings::RequestSettings & request_settings = {},
|
||||
bool for_disk_s3 = false);
|
||||
const S3Settings::RequestSettings & request_settings = {});
|
||||
|
||||
/// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message.
|
||||
void checkObjectExists(
|
||||
@ -53,7 +50,6 @@ void checkObjectExists(
|
||||
const String & key,
|
||||
const String & version_id = {},
|
||||
const S3Settings::RequestSettings & request_settings = {},
|
||||
bool for_disk_s3 = false,
|
||||
std::string_view description = {});
|
||||
|
||||
bool isNotFoundError(Aws::S3::S3Errors error);
|
||||
|
@ -214,9 +214,9 @@ void WriteBufferFromS3::finalizeImpl()
|
||||
|
||||
if (request_settings.check_objects_after_upload)
|
||||
{
|
||||
S3::checkObjectExists(*client_ptr, bucket, key, {}, request_settings, /* for_disk_s3= */ write_settings.for_object_storage, "Immediately after upload");
|
||||
S3::checkObjectExists(*client_ptr, bucket, key, {}, request_settings, "Immediately after upload");
|
||||
|
||||
size_t actual_size = S3::getObjectSize(*client_ptr, bucket, key, {}, request_settings, /* for_disk_s3= */ write_settings.for_object_storage);
|
||||
size_t actual_size = S3::getObjectSize(*client_ptr, bucket, key, {}, request_settings);
|
||||
if (actual_size != total_size)
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
@ -390,7 +390,7 @@ void WriteBufferFromS3::createMultipartUpload()
|
||||
client_ptr->setKMSHeaders(req);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload);
|
||||
if (write_settings.for_object_storage)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
|
||||
|
||||
Stopwatch watch;
|
||||
@ -429,7 +429,7 @@ void WriteBufferFromS3::abortMultipartUpload()
|
||||
req.SetUploadId(multipart_upload_id);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3AbortMultipartUpload);
|
||||
if (write_settings.for_object_storage)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3AbortMultipartUpload);
|
||||
|
||||
Stopwatch watch;
|
||||
@ -530,7 +530,7 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
|
||||
getShortLogDetails(), data_size, part_number);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3UploadPart);
|
||||
if (write_settings.for_object_storage)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3UploadPart);
|
||||
|
||||
auto & request = std::get<0>(*worker_data);
|
||||
@ -606,7 +606,7 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
|
||||
if (write_settings.for_object_storage)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
|
||||
|
||||
Stopwatch watch;
|
||||
@ -689,7 +689,7 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3PutObject);
|
||||
if (write_settings.for_object_storage)
|
||||
if (client_ptr->isClientForDisk())
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3PutObject);
|
||||
|
||||
ResourceCost cost = request.GetContentLength();
|
||||
|
@ -25,9 +25,6 @@ struct WriteSettings
|
||||
bool s3_allow_parallel_part_upload = true;
|
||||
bool azure_allow_parallel_part_upload = true;
|
||||
|
||||
/// Monitoring
|
||||
bool for_object_storage = false; // to choose which profile events should be incremented
|
||||
|
||||
bool operator==(const WriteSettings & other) const = default;
|
||||
};
|
||||
|
||||
|
@ -33,8 +33,6 @@ public:
|
||||
|
||||
bool update(const ContextPtr & context);
|
||||
|
||||
void connect(const ContextPtr & context);
|
||||
|
||||
bool withGlobs() const { return blob_path.find_first_of("*?{") != std::string::npos; }
|
||||
|
||||
bool withWildcard() const
|
||||
|
@ -1866,7 +1866,6 @@ namespace
|
||||
configuration.url.version_id,
|
||||
configuration.request_settings,
|
||||
/*with_metadata=*/ false,
|
||||
/*for_disk_s3=*/ false,
|
||||
/*throw_on_error= */ false).last_modification_time;
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
import logging
|
||||
import pytest
|
||||
import os
|
||||
import minio
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.mock_servers import start_s3_mock
|
||||
@ -608,3 +610,68 @@ def test_adaptive_timeouts(cluster, broken_s3, node_name):
|
||||
else:
|
||||
assert s3_use_adaptive_timeouts == "0"
|
||||
assert s3_errors == 0
|
||||
|
||||
|
||||
def test_no_key_found_disk(cluster, broken_s3):
|
||||
node = cluster.instances["node"]
|
||||
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE no_key_found_disk (
|
||||
id Int64
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
SETTINGS
|
||||
storage_policy='s3'
|
||||
"""
|
||||
)
|
||||
|
||||
uuid = node.query(
|
||||
"""
|
||||
SELECT uuid
|
||||
FROM system.tables
|
||||
WHERE name = 'no_key_found_disk'
|
||||
"""
|
||||
).strip()
|
||||
assert uuid
|
||||
|
||||
node.query("INSERT INTO no_key_found_disk VALUES (1)")
|
||||
|
||||
data = node.query("SELECT * FROM no_key_found_disk").strip()
|
||||
|
||||
assert data == "1"
|
||||
|
||||
remote_pathes = (
|
||||
node.query(
|
||||
f"""
|
||||
SELECT remote_path
|
||||
FROM system.remote_data_paths
|
||||
WHERE
|
||||
local_path LIKE '%{uuid}%'
|
||||
AND local_path LIKE '%.bin%'
|
||||
ORDER BY ALL
|
||||
"""
|
||||
)
|
||||
.strip()
|
||||
.split()
|
||||
)
|
||||
|
||||
assert len(remote_pathes) > 0
|
||||
|
||||
# path_prefix = os.path.join('/', cluster.minio_bucket)
|
||||
for path in remote_pathes:
|
||||
# name = os.path.relpath(path, path_prefix)
|
||||
# assert False, f"deleting full {path} prefix {path_prefix} name {name}"
|
||||
assert cluster.minio_client.stat_object(cluster.minio_bucket, path).size > 0
|
||||
cluster.minio_client.remove_object(cluster.minio_bucket, path)
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
size = cluster.minio_client.stat_object(cluster.minio_bucket, path).size
|
||||
assert size == 0
|
||||
assert "code: NoSuchKey" in str(exc_info.value)
|
||||
|
||||
error = node.query_and_get_error("SELECT * FROM no_key_found_disk").strip()
|
||||
|
||||
assert (
|
||||
"DB::Exception: The specified key does not exist. This error happened for S3 disk."
|
||||
in error
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user