diff --git a/src/Backups/BackupIO_AzureBlobStorage.cpp b/src/Backups/BackupIO_AzureBlobStorage.cpp index 9cf5c433826..a3998431674 100644 --- a/src/Backups/BackupIO_AzureBlobStorage.cpp +++ b/src/Backups/BackupIO_AzureBlobStorage.cpp @@ -14,6 +14,7 @@ #include #include +#include #include @@ -38,6 +39,8 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage( , configuration(configuration_) { auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false); + client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true}); + object_storage = std::make_unique("BackupReaderAzureBlobStorage", std::move(client_ptr), StorageAzureBlob::createSettings(context_), @@ -97,8 +100,7 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup, /* dest_path */ blob_path[0], settings, read_settings, - threadPoolCallbackRunnerUnsafe(getBackupsIOThreadPool().get(), "BackupRDAzure"), - /* for_disk_azure_blob_storage= */ true); + threadPoolCallbackRunnerUnsafe(getBackupsIOThreadPool().get(), "BackupRDAzure")); return file_size; }; @@ -123,6 +125,8 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage( , configuration(configuration_) { auto client_ptr = StorageAzureBlob::createClient(configuration, /* is_read_only */ false, attempt_to_create_container); + client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true}); + object_storage = std::make_unique("BackupWriterAzureBlobStorage", std::move(client_ptr), StorageAzureBlob::createSettings(context_), @@ -177,8 +181,7 @@ void BackupWriterAzureBlobStorage::copyFile(const String & destination, const St /* dest_path */ destination, settings, read_settings, - threadPoolCallbackRunnerUnsafe(getBackupsIOThreadPool().get(), "BackupWRAzure"), - /* for_disk_azure_blob_storage= */ true); + threadPoolCallbackRunnerUnsafe(getBackupsIOThreadPool().get(), "BackupWRAzure")); } void BackupWriterAzureBlobStorage::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length) diff --git a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp index 48b4ed23af0..da1ea65f2ea 100644 --- a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp @@ -225,7 +225,7 @@ void ReadBufferFromAzureBlobStorage::initialize() try { ProfileEvents::increment(ProfileEvents::AzureGetObject); - if (read_settings.for_object_storage) + if (blob_container_client->GetClickhouseOptions().IsClientForDisk) ProfileEvents::increment(ProfileEvents::DiskAzureGetObject); auto download_response = blob_client->Download(download_options); @@ -279,7 +279,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran try { ProfileEvents::increment(ProfileEvents::AzureGetObject); - if (read_settings.for_object_storage) + if (blob_container_client->GetClickhouseOptions().IsClientForDisk) ProfileEvents::increment(ProfileEvents::DiskAzureGetObject); Azure::Storage::Blobs::DownloadBlobOptions download_options; diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp index cf84fe46579..43bbb5cad4b 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -206,6 +207,8 @@ Azure::Storage::Blobs::BlobClientOptions getAzureBlobClientOptions(const Poco::U client_options.Retry = retry_options; client_options.Transport.Transport = std::make_shared(curl_options); + client_options.ClickhouseOptions = Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true}; + return client_options; } diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index 0f45f40288e..38a7db5702c 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -69,7 +69,8 @@ private: bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override { ProfileEvents::increment(ProfileEvents::AzureListObjects); - ProfileEvents::increment(ProfileEvents::DiskAzureListObjects); + if (client->GetClickhouseOptions().IsClientForDisk) + ProfileEvents::increment(ProfileEvents::DiskAzureListObjects); batch.clear(); auto outcome = client->ListBlobs(options); @@ -130,7 +131,8 @@ bool AzureObjectStorage::exists(const StoredObject & object) const options.PageSizeHint = 1; ProfileEvents::increment(ProfileEvents::AzureListObjects); - ProfileEvents::increment(ProfileEvents::DiskAzureListObjects); + if (client_ptr->GetClickhouseOptions().IsClientForDisk) + ProfileEvents::increment(ProfileEvents::DiskAzureListObjects); auto blobs_list_response = client_ptr->ListBlobs(options); auto blobs_list = blobs_list_response.Blobs; @@ -169,7 +171,8 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith while (true) { ProfileEvents::increment(ProfileEvents::AzureListObjects); - ProfileEvents::increment(ProfileEvents::DiskAzureListObjects); + if (client_ptr->GetClickhouseOptions().IsClientForDisk) + ProfileEvents::increment(ProfileEvents::DiskAzureListObjects); blob_list_response = client_ptr->ListBlobs(options); auto blobs_list = blob_list_response.Blobs; @@ -298,7 +301,8 @@ std::unique_ptr AzureObjectStorage::writeObject( /// NO void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const SharedAzureClientPtr & client_ptr, bool if_exists) { ProfileEvents::increment(ProfileEvents::AzureDeleteObjects); - ProfileEvents::increment(ProfileEvents::DiskAzureDeleteObjects); + if (client_ptr->GetClickhouseOptions().IsClientForDisk) + ProfileEvents::increment(ProfileEvents::DiskAzureDeleteObjects); const auto & path = object.remote_path; LOG_TEST(log, "Removing single object: {}", path); @@ -353,13 +357,14 @@ void AzureObjectStorage::removeObjectsIfExist(const StoredObjects & objects) ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) const { - ProfileEvents::increment(ProfileEvents::AzureGetProperties); - ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties); - auto client_ptr = client.get(); auto blob_client = client_ptr->GetBlobClient(path); auto properties = blob_client.GetProperties().Value; + ProfileEvents::increment(ProfileEvents::AzureGetProperties); + if (client_ptr->GetClickhouseOptions().IsClientForDisk) + ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties); + ObjectMetadata result; result.size_bytes = properties.BlobSize; if (!properties.Metadata.empty()) @@ -391,7 +396,8 @@ void AzureObjectStorage::copyObject( /// NOLINT } ProfileEvents::increment(ProfileEvents::AzureCopyObject); - ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject); + if (client_ptr->GetClickhouseOptions().IsClientForDisk) + ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject); dest_blob_client.CopyFromUri(source_blob_client.GetUrl(), copy_options); } diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index 5fd9695ec9e..accef9a08ab 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -84,16 +84,12 @@ const std::string & IObjectStorage::getCacheName() const ReadSettings IObjectStorage::patchSettings(const ReadSettings & read_settings) const { - ReadSettings settings{read_settings}; - settings.for_object_storage = true; - return settings; + return read_settings; } WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings) const { - WriteSettings settings{write_settings}; - settings.for_object_storage = true; - return settings; + return write_settings; } } diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 2f0d93907ae..2eae8877f87 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -158,7 +158,7 @@ private: bool S3ObjectStorage::exists(const StoredObject & object) const { auto settings_ptr = s3_settings.get(); - return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); + return S3::objectExists(*client.get(), uri.bucket, object.remote_path, {}, settings_ptr->request_settings); } std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT @@ -425,7 +425,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects) std::optional S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const { auto settings_ptr = s3_settings.get(); - auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false); + auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* throw_on_error= */ false); if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty()) return {}; @@ -441,7 +441,7 @@ std::optional S3ObjectStorage::tryGetObjectMetadata(const std::s ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const { auto settings_ptr = s3_settings.get(); - auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true); + auto object_info = S3::getObjectInfo(*client.get(), uri.bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true); ObjectMetadata result; result.size_bytes = object_info.size; @@ -464,9 +464,11 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT { auto current_client = dest_s3->client.get(); auto settings_ptr = s3_settings.get(); - auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); + auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings); auto scheduler = threadPoolCallbackRunnerUnsafe(getThreadPoolWriter(), "S3ObjStor_copy"); - try { + + try + { copyS3File( current_client, uri.bucket, @@ -479,8 +481,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT patchSettings(read_settings), BlobStorageLogWriter::create(disk_name), object_to_attributes, - scheduler, - /* for_disk_s3= */ true); + scheduler); return; } catch (S3Exception & exc) @@ -506,8 +507,9 @@ void S3ObjectStorage::copyObject( // NOLINT { auto current_client = client.get(); auto settings_ptr = s3_settings.get(); - auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); + auto size = S3::getObjectSize(*current_client, uri.bucket, object_from.remote_path, {}, settings_ptr->request_settings); auto scheduler = threadPoolCallbackRunnerUnsafe(getThreadPoolWriter(), "S3ObjStor_copy"); + copyS3File(current_client, uri.bucket, object_from.remote_path, @@ -519,8 +521,7 @@ void S3ObjectStorage::copyObject( // NOLINT patchSettings(read_settings), BlobStorageLogWriter::create(disk_name), object_to_attributes, - scheduler, - /* for_disk_s3= */ true); + scheduler); } void S3ObjectStorage::setNewSettings(std::unique_ptr && s3_settings_) diff --git a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp index ef8c01f4b5e..769f1a184f6 100644 --- a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp +++ b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp @@ -46,7 +46,6 @@ namespace const String & dest_blob_, std::shared_ptr settings_, ThreadPoolCallbackRunnerUnsafe schedule_, - bool for_disk_azure_blob_storage_, const Poco::Logger * log_) : create_read_buffer(create_read_buffer_) , client(client_) @@ -56,7 +55,6 @@ namespace , dest_blob(dest_blob_) , settings(settings_) , schedule(schedule_) - , for_disk_azure_blob_storage(for_disk_azure_blob_storage_) , log(log_) , max_single_part_upload_size(settings_->max_single_part_upload_size) { @@ -73,7 +71,6 @@ namespace const String & dest_blob; std::shared_ptr settings; ThreadPoolCallbackRunnerUnsafe schedule; - bool for_disk_azure_blob_storage; const Poco::Logger * log; size_t max_single_part_upload_size; @@ -217,7 +214,7 @@ namespace void processUploadPartRequest(UploadPartTask & task) { ProfileEvents::increment(ProfileEvents::AzureUploadPart); - if (for_disk_azure_blob_storage) + if (client->GetClickhouseOptions().IsClientForDisk) ProfileEvents::increment(ProfileEvents::DiskAzureUploadPart); auto block_blob_client = client->GetBlockBlobClient(dest_blob); @@ -269,10 +266,9 @@ void copyDataToAzureBlobStorageFile( const String & dest_container_for_logging, const String & dest_blob, std::shared_ptr settings, - ThreadPoolCallbackRunnerUnsafe schedule, - bool for_disk_azure_blob_storage) + ThreadPoolCallbackRunnerUnsafe schedule) { - UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyDataToAzureBlobStorageFile")}; + UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyDataToAzureBlobStorageFile")}; helper.performCopy(); } @@ -288,14 +284,13 @@ void copyAzureBlobStorageFile( const String & dest_blob, std::shared_ptr settings, const ReadSettings & read_settings, - ThreadPoolCallbackRunnerUnsafe schedule, - bool for_disk_azure_blob_storage) + ThreadPoolCallbackRunnerUnsafe schedule) { if (settings->use_native_copy) { ProfileEvents::increment(ProfileEvents::AzureCopyObject); - if (for_disk_azure_blob_storage) + if (dest_client->GetClickhouseOptions().IsClientForDisk) ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject); auto block_blob_client_src = src_client->GetBlockBlobClient(src_blob); @@ -330,7 +325,7 @@ void copyAzureBlobStorageFile( settings->max_single_download_retries); }; - UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, for_disk_azure_blob_storage, &Poco::Logger::get("copyAzureBlobStorageFile")}; + UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")}; helper.performCopy(); } } diff --git a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.h b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.h index 170a3d7f6aa..6ad54923ab5 100644 --- a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.h +++ b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.h @@ -31,8 +31,7 @@ void copyAzureBlobStorageFile( const String & dest_blob, std::shared_ptr settings, const ReadSettings & read_settings, - ThreadPoolCallbackRunnerUnsafe schedule_ = {}, - bool for_disk_azure_blob_storage = false); + ThreadPoolCallbackRunnerUnsafe schedule_ = {}); /// Copies data from any seekable source to AzureBlobStorage. @@ -48,8 +47,7 @@ void copyDataToAzureBlobStorageFile( const String & dest_container_for_logging, const String & dest_blob, std::shared_ptr settings, - ThreadPoolCallbackRunnerUnsafe schedule_ = {}, - bool for_disk_azure_blob_storage = false); + ThreadPoolCallbackRunnerUnsafe schedule_ = {}); } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 491ff253066..813546aa052 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -314,7 +314,7 @@ size_t ReadBufferFromS3::getFileSize() if (file_size) return *file_size; - auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, request_settings, /* for_disk_s3= */ read_settings.for_object_storage); + auto object_size = S3::getObjectSize(*client_ptr, bucket, key, version_id, request_settings); file_size = object_size; return *file_size; @@ -415,7 +415,7 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si } ProfileEvents::increment(ProfileEvents::S3GetObject); - if (read_settings.for_object_storage) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3GetObject); ProfileEventTimeIncrement watch(ProfileEvents::ReadBufferFromS3InitMicroseconds); diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index 6a0cac35878..6c44861eae3 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -127,9 +127,6 @@ struct ReadSettings bool http_skip_not_found_url_for_globs = true; bool http_make_head_request = true; - /// Monitoring - bool for_object_storage = false; // to choose which profile events should be incremented - ReadSettings adjustBufferSize(size_t file_size) const { ReadSettings res = *this; diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 6c138eb3bff..9229342b8c1 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -384,7 +384,8 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const /// The next call is NOT a recurcive call /// This is a virtuall call Aws::S3::S3Client::HeadObject(const Model::HeadObjectRequest&) - return HeadObject(static_cast(request)); + return enrichErrorMessage( + HeadObject(static_cast(request))); } /// For each request, we wrap the request functions from Aws::S3::Client with doRequest @@ -404,7 +405,8 @@ Model::ListObjectsOutcome Client::ListObjects(ListObjectsRequest & request) cons Model::GetObjectOutcome Client::GetObject(GetObjectRequest & request) const { - return doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); }); + return enrichErrorMessage( + doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); })); } Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(AbortMultipartUploadRequest & request) const @@ -652,14 +654,14 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request if constexpr (IsReadMethod) { - if (client_configuration.for_disk_s3) + if (isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3ReadRequestsErrors); else ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors); } else { - if (client_configuration.for_disk_s3) + if (isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3WriteRequestsErrors); else ProfileEvents::increment(ProfileEvents::S3WriteRequestsErrors); @@ -689,6 +691,23 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request return doRequest(request, with_retries); } +template +RequestResult Client::enrichErrorMessage(RequestResult && outcome) const +{ + if (outcome.IsSuccess() || !isClientForDisk()) + return std::forward(outcome); + + String enriched_message = fmt::format( + "{} {}", + outcome.GetError().GetMessage(), + "This error happened for S3 disk."); + + auto error = outcome.GetError(); + error.SetMessage(enriched_message); + + return RequestResult(error); +} + bool Client::supportsMultiPartCopy() const { return provider_type != ProviderType::GCS; diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index c79ec05c8c6..bd281846343 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -214,6 +214,11 @@ public: bool isS3ExpressBucket() const { return client_settings.is_s3express_bucket; } + bool isClientForDisk() const + { + return client_configuration.for_disk_s3; + } + private: friend struct ::MockS3::Client; @@ -265,6 +270,9 @@ private: bool checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const; void insertRegionOverride(const std::string & bucket, const std::string & region) const; + template + RequestResult enrichErrorMessage(RequestResult && outcome) const; + String initial_endpoint; std::shared_ptr credentials_provider; PocoHTTPClientConfiguration client_configuration; diff --git a/src/IO/S3/copyS3File.cpp b/src/IO/S3/copyS3File.cpp index 3b1f25ed994..549d0a569c6 100644 --- a/src/IO/S3/copyS3File.cpp +++ b/src/IO/S3/copyS3File.cpp @@ -140,7 +140,7 @@ namespace fillCreateMultipartRequest(request); ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload); - if (for_disk_s3) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload); auto outcome = client_ptr->CreateMultipartUpload(request); @@ -189,7 +189,7 @@ namespace for (size_t retries = 1;; ++retries) { ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload); - if (for_disk_s3) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload); auto outcome = client_ptr->CompleteMultipartUpload(request); @@ -239,7 +239,7 @@ namespace void checkObjectAfterUpload() { LOG_TRACE(log, "Checking object {} exists after upload", dest_key); - S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, request_settings, {}, "Immediately after upload"); + S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, request_settings, "Immediately after upload"); LOG_TRACE(log, "Object {} exists after upload", dest_key); } @@ -528,7 +528,7 @@ namespace for (size_t retries = 1;; ++retries) { ProfileEvents::increment(ProfileEvents::S3PutObject); - if (for_disk_s3) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3PutObject); Stopwatch watch; @@ -615,7 +615,7 @@ namespace auto & req = typeid_cast(request); ProfileEvents::increment(ProfileEvents::S3UploadPart); - if (for_disk_s3) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3UploadPart); auto outcome = client_ptr->UploadPart(req); @@ -726,7 +726,7 @@ namespace for (size_t retries = 1;; ++retries) { ProfileEvents::increment(ProfileEvents::S3CopyObject); - if (for_disk_s3) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3CopyObject); auto outcome = client_ptr->CopyObject(request); @@ -830,7 +830,7 @@ namespace auto & req = typeid_cast(request); ProfileEvents::increment(ProfileEvents::S3UploadPartCopy); - if (for_disk_s3) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3UploadPartCopy); auto outcome = client_ptr->UploadPartCopy(req); diff --git a/src/IO/S3/getObjectInfo.cpp b/src/IO/S3/getObjectInfo.cpp index 88f79f8d8d5..eee3da9fb74 100644 --- a/src/IO/S3/getObjectInfo.cpp +++ b/src/IO/S3/getObjectInfo.cpp @@ -25,10 +25,10 @@ namespace DB::S3 namespace { Aws::S3::Model::HeadObjectOutcome headObject( - const S3::Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3) + const S3::Client & client, const String & bucket, const String & key, const String & version_id) { ProfileEvents::increment(ProfileEvents::S3HeadObject); - if (for_disk_s3) + if (client.isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3HeadObject); S3::HeadObjectRequest req; @@ -44,9 +44,9 @@ namespace /// Performs a request to get the size and last modification time of an object. std::pair, Aws::S3::S3Error> tryGetObjectInfo( const S3::Client & client, const String & bucket, const String & key, const String & version_id, - const S3Settings::RequestSettings & /*request_settings*/, bool with_metadata, bool for_disk_s3) + const S3Settings::RequestSettings & /*request_settings*/, bool with_metadata) { - auto outcome = headObject(client, bucket, key, version_id, for_disk_s3); + auto outcome = headObject(client, bucket, key, version_id); if (!outcome.IsSuccess()) return {std::nullopt, outcome.GetError()}; @@ -75,10 +75,9 @@ ObjectInfo getObjectInfo( const String & version_id, const S3Settings::RequestSettings & request_settings, bool with_metadata, - bool for_disk_s3, bool throw_on_error) { - auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, with_metadata, for_disk_s3); + auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, with_metadata); if (object_info) { return *object_info; @@ -98,10 +97,9 @@ size_t getObjectSize( const String & key, const String & version_id, const S3Settings::RequestSettings & request_settings, - bool for_disk_s3, bool throw_on_error) { - return getObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3, throw_on_error).size; + return getObjectInfo(client, bucket, key, version_id, request_settings, {}, throw_on_error).size; } bool objectExists( @@ -109,10 +107,9 @@ bool objectExists( const String & bucket, const String & key, const String & version_id, - const S3Settings::RequestSettings & request_settings, - bool for_disk_s3) + const S3Settings::RequestSettings & request_settings) { - auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3); + auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}); if (object_info) return true; @@ -130,10 +127,9 @@ void checkObjectExists( const String & key, const String & version_id, const S3Settings::RequestSettings & request_settings, - bool for_disk_s3, std::string_view description) { - auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}, for_disk_s3); + auto [object_info, error] = tryGetObjectInfo(client, bucket, key, version_id, request_settings, {}); if (object_info) return; throw S3Exception(error.GetErrorType(), "{}Object {} in bucket {} suddenly disappeared: {}", diff --git a/src/IO/S3/getObjectInfo.h b/src/IO/S3/getObjectInfo.h index a57d807644b..ac8072a4338 100644 --- a/src/IO/S3/getObjectInfo.h +++ b/src/IO/S3/getObjectInfo.h @@ -26,7 +26,6 @@ ObjectInfo getObjectInfo( const String & version_id = {}, const S3Settings::RequestSettings & request_settings = {}, bool with_metadata = false, - bool for_disk_s3 = false, bool throw_on_error = true); size_t getObjectSize( @@ -35,7 +34,6 @@ size_t getObjectSize( const String & key, const String & version_id = {}, const S3Settings::RequestSettings & request_settings = {}, - bool for_disk_s3 = false, bool throw_on_error = true); bool objectExists( @@ -43,8 +41,7 @@ bool objectExists( const String & bucket, const String & key, const String & version_id = {}, - const S3Settings::RequestSettings & request_settings = {}, - bool for_disk_s3 = false); + const S3Settings::RequestSettings & request_settings = {}); /// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message. void checkObjectExists( @@ -53,7 +50,6 @@ void checkObjectExists( const String & key, const String & version_id = {}, const S3Settings::RequestSettings & request_settings = {}, - bool for_disk_s3 = false, std::string_view description = {}); bool isNotFoundError(Aws::S3::S3Errors error); diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index e41867ce225..3ea372f75d8 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -214,9 +214,9 @@ void WriteBufferFromS3::finalizeImpl() if (request_settings.check_objects_after_upload) { - S3::checkObjectExists(*client_ptr, bucket, key, {}, request_settings, /* for_disk_s3= */ write_settings.for_object_storage, "Immediately after upload"); + S3::checkObjectExists(*client_ptr, bucket, key, {}, request_settings, "Immediately after upload"); - size_t actual_size = S3::getObjectSize(*client_ptr, bucket, key, {}, request_settings, /* for_disk_s3= */ write_settings.for_object_storage); + size_t actual_size = S3::getObjectSize(*client_ptr, bucket, key, {}, request_settings); if (actual_size != total_size) throw Exception( ErrorCodes::S3_ERROR, @@ -390,7 +390,7 @@ void WriteBufferFromS3::createMultipartUpload() client_ptr->setKMSHeaders(req); ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload); - if (write_settings.for_object_storage) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload); Stopwatch watch; @@ -429,7 +429,7 @@ void WriteBufferFromS3::abortMultipartUpload() req.SetUploadId(multipart_upload_id); ProfileEvents::increment(ProfileEvents::S3AbortMultipartUpload); - if (write_settings.for_object_storage) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3AbortMultipartUpload); Stopwatch watch; @@ -530,7 +530,7 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data) getShortLogDetails(), data_size, part_number); ProfileEvents::increment(ProfileEvents::S3UploadPart); - if (write_settings.for_object_storage) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3UploadPart); auto & request = std::get<0>(*worker_data); @@ -606,7 +606,7 @@ void WriteBufferFromS3::completeMultipartUpload() for (size_t i = 0; i < max_retry; ++i) { ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload); - if (write_settings.for_object_storage) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload); Stopwatch watch; @@ -689,7 +689,7 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data for (size_t i = 0; i < max_retry; ++i) { ProfileEvents::increment(ProfileEvents::S3PutObject); - if (write_settings.for_object_storage) + if (client_ptr->isClientForDisk()) ProfileEvents::increment(ProfileEvents::DiskS3PutObject); ResourceCost cost = request.GetContentLength(); diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index 4da3806e51d..84bb25439b5 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -25,9 +25,6 @@ struct WriteSettings bool s3_allow_parallel_part_upload = true; bool azure_allow_parallel_part_upload = true; - /// Monitoring - bool for_object_storage = false; // to choose which profile events should be incremented - bool operator==(const WriteSettings & other) const = default; }; diff --git a/src/Storages/StorageAzureBlob.h b/src/Storages/StorageAzureBlob.h index 5b0d8802657..20e7f4a6c90 100644 --- a/src/Storages/StorageAzureBlob.h +++ b/src/Storages/StorageAzureBlob.h @@ -33,8 +33,6 @@ public: bool update(const ContextPtr & context); - void connect(const ContextPtr & context); - bool withGlobs() const { return blob_path.find_first_of("*?{") != std::string::npos; } bool withWildcard() const diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index c2039c2dd79..3fe0b66a453 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1866,7 +1866,6 @@ namespace configuration.url.version_id, configuration.request_settings, /*with_metadata=*/ false, - /*for_disk_s3=*/ false, /*throw_on_error= */ false).last_modification_time; } diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 1391f1af6f1..22d6d263d23 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -2,6 +2,8 @@ import logging import pytest +import os +import minio from helpers.cluster import ClickHouseCluster from helpers.mock_servers import start_s3_mock @@ -608,3 +610,68 @@ def test_adaptive_timeouts(cluster, broken_s3, node_name): else: assert s3_use_adaptive_timeouts == "0" assert s3_errors == 0 + + +def test_no_key_found_disk(cluster, broken_s3): + node = cluster.instances["node"] + + node.query( + """ + CREATE TABLE no_key_found_disk ( + id Int64 + ) ENGINE=MergeTree() + ORDER BY id + SETTINGS + storage_policy='s3' + """ + ) + + uuid = node.query( + """ + SELECT uuid + FROM system.tables + WHERE name = 'no_key_found_disk' + """ + ).strip() + assert uuid + + node.query("INSERT INTO no_key_found_disk VALUES (1)") + + data = node.query("SELECT * FROM no_key_found_disk").strip() + + assert data == "1" + + remote_pathes = ( + node.query( + f""" + SELECT remote_path + FROM system.remote_data_paths + WHERE + local_path LIKE '%{uuid}%' + AND local_path LIKE '%.bin%' + ORDER BY ALL + """ + ) + .strip() + .split() + ) + + assert len(remote_pathes) > 0 + + # path_prefix = os.path.join('/', cluster.minio_bucket) + for path in remote_pathes: + # name = os.path.relpath(path, path_prefix) + # assert False, f"deleting full {path} prefix {path_prefix} name {name}" + assert cluster.minio_client.stat_object(cluster.minio_bucket, path).size > 0 + cluster.minio_client.remove_object(cluster.minio_bucket, path) + with pytest.raises(Exception) as exc_info: + size = cluster.minio_client.stat_object(cluster.minio_bucket, path).size + assert size == 0 + assert "code: NoSuchKey" in str(exc_info.value) + + error = node.query_and_get_error("SELECT * FROM no_key_found_disk").strip() + + assert ( + "DB::Exception: The specified key does not exist. This error happened for S3 disk." + in error + )