From 7fbf87be176081411918ae35f040f338892d1416 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 18 Apr 2023 13:11:42 +0200 Subject: [PATCH] rework WriteBufferFromS3, squashed --- contrib/googletest-cmake/CMakeLists.txt | 25 +- src/Backups/BackupIO_S3.cpp | 1 - src/CMakeLists.txt | 1 + src/Disks/DiskLocal.cpp | 1 + .../IO/CachedOnDiskWriteBufferFromFile.cpp | 13 +- .../ObjectStorages/S3/S3ObjectStorage.cpp | 3 +- src/IO/S3/Client.cpp | 33 +- src/IO/S3/Client.h | 7 + src/IO/SwapHelper.cpp | 17 + src/IO/SwapHelper.h | 21 +- src/IO/WriteBuffer.h | 5 +- src/IO/WriteBufferFromFileDecorator.cpp | 38 +- src/IO/WriteBufferFromFileDecorator.h | 7 +- src/IO/WriteBufferFromS3.cpp | 946 +++++++------- src/IO/WriteBufferFromS3.h | 95 +- src/IO/WriteBufferFromS3MemoryStream.cpp | 68 + src/IO/WriteBufferFromS3MemoryStream.h | 39 + src/IO/WriteBufferFromS3TaskTracker.cpp | 137 ++ src/IO/WriteBufferFromS3TaskTracker.h | 37 + src/IO/tests/gtest_writebuffer_s3.cpp | 1114 +++++++++++++++++ src/Storages/MergeTree/MergeTreeData.cpp | 1 + .../MergeTree/MergeTreeDeduplicationLog.cpp | 3 + .../MergeTree/MergeTreeMutationEntry.cpp | 1 + src/Storages/StorageS3.cpp | 1 - .../02240_filesystem_query_cache.reference | 1 + .../02240_filesystem_query_cache.sql | 2 +- ...system_cache_on_write_operations.reference | 36 +- ...41_filesystem_cache_on_write_operations.sh | 19 +- ...ilesystem_cache_persistent_files.reference | 17 +- ...events_from_query_log_and_client.reference | 4 +- 30 files changed, 2102 insertions(+), 591 deletions(-) create mode 100644 src/IO/SwapHelper.cpp create mode 100644 src/IO/WriteBufferFromS3MemoryStream.cpp create mode 100644 src/IO/WriteBufferFromS3MemoryStream.h create mode 100644 src/IO/WriteBufferFromS3TaskTracker.cpp create mode 100644 src/IO/WriteBufferFromS3TaskTracker.h create mode 100644 src/IO/tests/gtest_writebuffer_s3.cpp diff --git a/contrib/googletest-cmake/CMakeLists.txt b/contrib/googletest-cmake/CMakeLists.txt index 90fdde0c185..3905df03155 100644 --- a/contrib/googletest-cmake/CMakeLists.txt +++ b/contrib/googletest-cmake/CMakeLists.txt @@ -1,15 +1,30 @@ -set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest") +set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/googletest") -add_library(_gtest "${SRC_DIR}/src/gtest-all.cc") +add_library(_gtest "${SRC_DIR}/googletest/src/gtest-all.cc") set_target_properties(_gtest PROPERTIES VERSION "1.0.0") target_compile_definitions (_gtest PUBLIC GTEST_HAS_POSIX_RE=0) -target_include_directories(_gtest SYSTEM PUBLIC "${SRC_DIR}/include") -target_include_directories(_gtest PRIVATE "${SRC_DIR}") +target_include_directories(_gtest SYSTEM PUBLIC "${SRC_DIR}/googletest/include") +target_include_directories(_gtest PRIVATE "${SRC_DIR}/googletest") -add_library(_gtest_main "${SRC_DIR}/src/gtest_main.cc") +add_library(_gtest_main "${SRC_DIR}/googletest/src/gtest_main.cc") set_target_properties(_gtest_main PROPERTIES VERSION "1.0.0") target_link_libraries(_gtest_main PUBLIC _gtest) add_library(_gtest_all INTERFACE) target_link_libraries(_gtest_all INTERFACE _gtest _gtest_main) add_library(ch_contrib::gtest_all ALIAS _gtest_all) + + +add_library(_gmock "${SRC_DIR}/googlemock/src/gmock-all.cc") +set_target_properties(_gmock PROPERTIES VERSION "1.0.0") +target_compile_definitions (_gmock PUBLIC GTEST_HAS_POSIX_RE=0) +target_include_directories(_gmock SYSTEM PUBLIC "${SRC_DIR}/googlemock/include" "${SRC_DIR}/googletest/include") +target_include_directories(_gmock PRIVATE "${SRC_DIR}/googlemock") + +add_library(_gmock_main "${SRC_DIR}/googlemock/src/gmock_main.cc") +set_target_properties(_gmock_main PROPERTIES VERSION "1.0.0") +target_link_libraries(_gmock_main PUBLIC _gmock) + +add_library(_gmock_all INTERFACE) +target_link_libraries(_gmock_all INTERFACE _gmock _gmock_main) +add_library(ch_contrib::gmock_all ALIAS _gmock_all) diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index 90333900d4a..84dba63ae4e 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -253,7 +253,6 @@ std::unique_ptr BackupWriterS3::writeFile(const String & file_name) fs::path(s3_uri.key) / file_name, request_settings, std::nullopt, - DBMS_DEFAULT_BUFFER_SIZE, threadPoolCallbackRunner(BackupsIOThreadPool::get(), "BackupWriterS3")); } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b3f4fbb7420..ac99a7c3669 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -612,6 +612,7 @@ if (ENABLE_TESTS) target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::gtest_all + ch_contrib::gmock_all clickhouse_functions clickhouse_aggregate_functions clickhouse_parsers diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 69b70da272a..1abecb7af4e 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -544,6 +544,7 @@ try auto tmp_file = std::make_unique(disk_ptr); auto buf = std::make_unique(std::move(tmp_file)); buf->write(data.data, data.PAGE_SIZE_IN_BYTES); + buf->finalize(); buf->sync(); } return true; diff --git a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp index af2226ea6ca..9153af90312 100644 --- a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp @@ -211,10 +211,16 @@ void CachedOnDiskWriteBufferFromFile::nextImpl() { size_t size = offset(); + /// Write data to cache. + cacheData(working_buffer.begin(), size, throw_on_error_from_cache); + current_download_offset += size; + try { SwapHelper swap(*this, *impl); /// Write data to the underlying buffer. + /// Actually here WriteBufferFromFileDecorator::nextImpl has to be called, but it is pivate method. + /// In particular WriteBufferFromFileDecorator introduces logic with swaps in order to achieve delegation. impl->next(); } catch (...) @@ -225,10 +231,6 @@ void CachedOnDiskWriteBufferFromFile::nextImpl() throw; } - - /// Write data to cache. - cacheData(working_buffer.begin(), size, throw_on_error_from_cache); - current_download_offset += size; } void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool throw_on_error) @@ -292,8 +294,7 @@ void CachedOnDiskWriteBufferFromFile::finalizeImpl() { try { - SwapHelper swap(*this, *impl); - impl->finalize(); + WriteBufferFromFileDecorator::finalizeImpl(); } catch (...) { diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 2eee8bf5693..79b3d3a2b8b 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -161,7 +161,7 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN WriteMode mode, // S3 doesn't support append, only rewrite std::optional attributes, FinalizeCallback && finalize_callback, - size_t buf_size, + size_t buf_size [[maybe_unused]], const WriteSettings & write_settings) { WriteSettings disk_write_settings = IObjectStorage::patchSettings(write_settings); @@ -180,7 +180,6 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN object.remote_path, settings_ptr->request_settings, attributes, - buf_size, std::move(scheduler), disk_write_settings); diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 3c0a8122a91..3c39893b44e 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -255,7 +255,7 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c if (auto uri = getURIForBucket(bucket); uri.has_value()) request.overrideURI(std::move(*uri)); - auto result = Aws::S3::S3Client::HeadObject(request); + auto result = HeadObject(static_cast(request)); if (result.IsSuccess()) return result; @@ -312,70 +312,75 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c request.overrideURI(std::move(*bucket_uri)); - return Aws::S3::S3Client::HeadObject(request); + /// The next call is NOT a recurcive call + /// This is a virtuall call Aws::S3::S3Client::HeadObject(const Model::HeadObjectRequest&) + return HeadObject(static_cast(request)); } +/// For each request, we wrap the request functions from Aws::S3::Client with doRequest +/// doRequest calls virtuall function from Aws::S3::Client while DB::S3::Client has not virtual calls for each request type + Model::ListObjectsV2Outcome Client::ListObjectsV2(const ListObjectsV2Request & request) const { - return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return Aws::S3::S3Client::ListObjectsV2(req); }); + return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); }); } Model::ListObjectsOutcome Client::ListObjects(const ListObjectsRequest & request) const { - return doRequest(request, [this](const Model::ListObjectsRequest & req) { return Aws::S3::S3Client::ListObjects(req); }); + return doRequest(request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); }); } Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) const { - return doRequest(request, [this](const Model::GetObjectRequest & req) { return Aws::S3::S3Client::GetObject(req); }); + return doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); }); } Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(const AbortMultipartUploadRequest & request) const { return doRequest( - request, [this](const Model::AbortMultipartUploadRequest & req) { return Aws::S3::S3Client::AbortMultipartUpload(req); }); + request, [this](const Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); }); } Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(const CreateMultipartUploadRequest & request) const { return doRequest( - request, [this](const Model::CreateMultipartUploadRequest & req) { return Aws::S3::S3Client::CreateMultipartUpload(req); }); + request, [this](const Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); }); } Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const CompleteMultipartUploadRequest & request) const { return doRequest( - request, [this](const Model::CompleteMultipartUploadRequest & req) { return Aws::S3::S3Client::CompleteMultipartUpload(req); }); + request, [this](const Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); }); } Model::CopyObjectOutcome Client::CopyObject(const CopyObjectRequest & request) const { - return doRequest(request, [this](const Model::CopyObjectRequest & req) { return Aws::S3::S3Client::CopyObject(req); }); + return doRequest(request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); }); } Model::PutObjectOutcome Client::PutObject(const PutObjectRequest & request) const { - return doRequest(request, [this](const Model::PutObjectRequest & req) { return Aws::S3::S3Client::PutObject(req); }); + return doRequest(request, [this](const Model::PutObjectRequest & req) { return PutObject(req); }); } Model::UploadPartOutcome Client::UploadPart(const UploadPartRequest & request) const { - return doRequest(request, [this](const Model::UploadPartRequest & req) { return Aws::S3::S3Client::UploadPart(req); }); + return doRequest(request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); }); } Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest & request) const { - return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return Aws::S3::S3Client::UploadPartCopy(req); }); + return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); }); } Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const { - return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return Aws::S3::S3Client::DeleteObject(req); }); + return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); }); } Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const { - return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return Aws::S3::S3Client::DeleteObjects(req); }); + return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); }); } template diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index 63feb94e593..330c85c418a 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -40,6 +40,11 @@ struct ServerSideEncryptionKMSConfig #include #include +namespace MockS3 +{ + struct Client; +} + namespace DB::S3 { @@ -195,6 +200,8 @@ public: bool supportsMultiPartCopy() const; private: + friend struct ::MockS3::Client; + Client(size_t max_redirects_, ServerSideEncryptionKMSConfig sse_kms_config_, const std::shared_ptr& credentials_provider, diff --git a/src/IO/SwapHelper.cpp b/src/IO/SwapHelper.cpp new file mode 100644 index 00000000000..4a1cc8acf4c --- /dev/null +++ b/src/IO/SwapHelper.cpp @@ -0,0 +1,17 @@ +#include + +namespace DB +{ + +SwapHelper::SwapHelper(BufferBase & b1_, BufferBase & b2_) + : b1(b1_), b2(b2_) +{ + b1.swap(b2); +} + +SwapHelper::~SwapHelper() +{ + b1.swap(b2); +} + +} diff --git a/src/IO/SwapHelper.h b/src/IO/SwapHelper.h index cedbf5f78fe..fcf32927f23 100644 --- a/src/IO/SwapHelper.h +++ b/src/IO/SwapHelper.h @@ -1,16 +1,19 @@ #pragma once + #include namespace DB { - class SwapHelper - { - public: - SwapHelper(BufferBase & b1_, BufferBase & b2_) : b1(b1_), b2(b2_) { b1.swap(b2); } - ~SwapHelper() { b1.swap(b2); } - private: - BufferBase & b1; - BufferBase & b2; - }; +class SwapHelper +{ +public: + SwapHelper(BufferBase & b1_, BufferBase & b2_); + ~SwapHelper(); + +private: + BufferBase & b1; + BufferBase & b2; +}; + } diff --git a/src/IO/WriteBuffer.h b/src/IO/WriteBuffer.h index 436d07515a3..2c891e17d9a 100644 --- a/src/IO/WriteBuffer.h +++ b/src/IO/WriteBuffer.h @@ -42,7 +42,8 @@ public: { if (!offset()) return; - bytes += offset(); + + auto bytes_in_buffer = offset(); try { @@ -54,9 +55,11 @@ public: * so that later (for example, when the stack was expanded) there was no second attempt to write data. */ pos = working_buffer.begin(); + bytes += bytes_in_buffer; throw; } + bytes += bytes_in_buffer; pos = working_buffer.begin(); } diff --git a/src/IO/WriteBufferFromFileDecorator.cpp b/src/IO/WriteBufferFromFileDecorator.cpp index ac801534b4f..4cc881f177f 100644 --- a/src/IO/WriteBufferFromFileDecorator.cpp +++ b/src/IO/WriteBufferFromFileDecorator.cpp @@ -1,6 +1,7 @@ #include "WriteBufferFromFileDecorator.h" #include +#include namespace DB { @@ -13,12 +14,18 @@ WriteBufferFromFileDecorator::WriteBufferFromFileDecorator(std::unique_ptrfinalized is remain false + /// That leads to situation when the destructor of impl is called with impl->finalized equal false. if (!is_prefinalized) WriteBufferFromFileDecorator::preFinalize(); - impl->finalize(); + { + SwapHelper swap(*this, *impl); + impl->finalize(); + } } WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator() @@ -31,11 +38,21 @@ WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator() { tryLogCurrentException(__PRETTY_FUNCTION__); } + + /// It is not a mistake that swap is called here + /// Swap has been called at constructor, it should be called at destructor + /// In oreder to provide valid buffer for impl's d-tor call + swap(*impl); } void WriteBufferFromFileDecorator::sync() { - impl->sync(); + next(); + + { + SwapHelper swap(*this, *impl); + impl->sync(); + } } std::string WriteBufferFromFileDecorator::getFileName() const @@ -45,11 +62,22 @@ std::string WriteBufferFromFileDecorator::getFileName() const return std::string(); } +void WriteBufferFromFileDecorator::preFinalize() +{ + next(); + + { + SwapHelper swap(*this, *impl); + impl->preFinalize(); + } + + is_prefinalized = true; +} + void WriteBufferFromFileDecorator::nextImpl() { - swap(*impl); + SwapHelper swap(*this, *impl); impl->next(); - swap(*impl); } } diff --git a/src/IO/WriteBufferFromFileDecorator.h b/src/IO/WriteBufferFromFileDecorator.h index dde05276c28..5344bb1425c 100644 --- a/src/IO/WriteBufferFromFileDecorator.h +++ b/src/IO/WriteBufferFromFileDecorator.h @@ -17,12 +17,7 @@ public: std::string getFileName() const override; - void preFinalize() override - { - next(); - impl->preFinalize(); - is_prefinalized = true; - } + void preFinalize() override; const WriteBuffer & getImpl() const { return *impl; } diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 5a25cb89107..5630ed2cb68 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -2,13 +2,16 @@ #if USE_AWS_S3 +#include "WriteBufferFromS3.h" +#include "WriteBufferFromS3MemoryStream.h" +#include "WriteBufferFromS3TaskTracker.h" + #include #include #include #include #include -#include #include #include #include @@ -29,11 +32,13 @@ namespace ProfileEvents extern const Event S3CreateMultipartUpload; extern const Event S3CompleteMultipartUpload; + extern const Event S3AbortMultipartUpload; extern const Event S3UploadPart; extern const Event S3PutObject; extern const Event DiskS3CreateMultipartUpload; extern const Event DiskS3CompleteMultipartUpload; + extern const Event DiskS3AbortMultipartUpload; extern const Event DiskS3UploadPart; extern const Event DiskS3PutObject; @@ -43,30 +48,105 @@ namespace ProfileEvents namespace DB { -// S3 protocol does not allow to have multipart upload with more than 10000 parts. -// In case server does not return an error on exceeding that number, we print a warning -// because custom S3 implementation may allow relaxed requirements on that. -const int S3_WARN_MAX_PARTS = 10000; namespace ErrorCodes { extern const int S3_ERROR; extern const int INVALID_CONFIG_PARAMETER; + extern const int LOGICAL_ERROR; } -struct WriteBufferFromS3::UploadPartTask +struct WriteBufferFromS3::PartData { - S3::UploadPartRequest req; - bool is_finished = false; - std::string tag; - std::exception_ptr exception; + Memory<> memory; + size_t data_size = 0; + + std::shared_ptr createAwsBuffer() + { + auto buffer = std::make_shared(memory.data(), data_size); + buffer->exceptions(std::ios::badbit); + return buffer; + } + + bool isEmpty() const + { + return data_size == 0; + } }; -struct WriteBufferFromS3::PutObjectTask +struct WriteBufferFromS3::BufferAllocationPolicy { - S3::PutObjectRequest req; - bool is_finished = false; - std::exception_ptr exception; + size_t first_size = 0; + size_t second_size = 0; + + size_t multiply_factor = 0; + size_t multiply_threshold = 0; + size_t max_size = 0; + + size_t current_size = 0; + size_t buffer_number = 0; + + explicit BufferAllocationPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_) + : first_size(std::max(settings_.max_single_part_upload_size, settings_.min_upload_part_size)) + , second_size(settings_.min_upload_part_size) + , multiply_factor(settings_.upload_part_size_multiply_factor) + , multiply_threshold(settings_.upload_part_size_multiply_parts_count_threshold) + , max_size(settings_.max_upload_part_size) + { + if (settings_.strict_upload_part_size > 0) + { + first_size = settings_.strict_upload_part_size; + second_size = settings_.strict_upload_part_size; + multiply_factor = 1; + multiply_threshold = 10000; + max_size = settings_.max_upload_part_size; + } + else + { + first_size = std::max(settings_.max_single_part_upload_size, settings_.min_upload_part_size); + second_size = settings_.min_upload_part_size; + multiply_factor = settings_.upload_part_size_multiply_factor; + multiply_threshold = settings_.upload_part_size_multiply_parts_count_threshold; + max_size = settings_.max_upload_part_size; + } + + chassert(first_size > 0); + chassert(second_size > 0); + chassert(multiply_factor >= 1); + chassert(multiply_threshold > 0); + chassert(max_size > 0); + } + + size_t getNumber() const + { + return buffer_number; + } + + size_t getSize() const + { + chassert(buffer_number > 0); + return current_size; + } + + void next() + { + ++buffer_number; + + if (1 == buffer_number) + { + current_size = first_size; + return; + } + + if (2 == buffer_number) + current_size = second_size; + + if (0 == ((buffer_number-1) % multiply_threshold)) + { + current_size *= multiply_factor; + current_size = std::min(current_size, max_size); + } + } }; WriteBufferFromS3::WriteBufferFromS3( @@ -75,146 +155,88 @@ WriteBufferFromS3::WriteBufferFromS3( const String & key_, const S3Settings::RequestSettings & request_settings_, std::optional> object_metadata_, - size_t buffer_size_, ThreadPoolCallbackRunner schedule_, const WriteSettings & write_settings_) - : BufferWithOwnMemory(buffer_size_, nullptr, 0) - , bucket(bucket_) + : bucket(bucket_) , key(key_) , request_settings(request_settings_) , upload_settings(request_settings.getUploadSettings()) + , write_settings(write_settings_) , client_ptr(std::move(client_ptr_)) , object_metadata(std::move(object_metadata_)) - , strict_upload_part_size(upload_settings.strict_upload_part_size) - , current_upload_part_size(upload_settings.min_upload_part_size) - , schedule(std::move(schedule_)) - , write_settings(write_settings_) + , buffer_allocation_policy(std::make_unique(request_settings_.getUploadSettings())) + , task_tracker(std::make_unique(std::move(schedule_))) { + LOG_TRACE(log, "Create WriteBufferFromS3, {}", getLogDetails()); + allocateBuffer(); } void WriteBufferFromS3::nextImpl() { - if (!offset()) + LOG_TRACE(log, "nextImpl with incoming data size {}, memory buffer size {}. {}", offset(), memory.size(), getLogDetails()); + + if (is_prefinalized) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot write to prefinalized buffer for S3, the file could have been created with PutObjectRequest"); + + /// Make sense to call to before adding new async task to check if there is an exception + task_tracker->getReady(); + + hidePartialData(); + + reallocateFirstBuffer(); + + if (available() > 0) return; - /// Buffer in a bad state after exception - if (temporary_buffer->tellp() == -1) - allocateBuffer(); - else - chassert(temporary_buffer->tellp() == static_cast(last_part_size)); + detachBuffer(); - if (strict_upload_part_size) - processWithStrictParts(); - else - processWithDynamicParts(); + if (!multipart_upload_id.empty() || detached_part_data.size() > 1) + writeMultipartUpload(); - waitForReadyBackgroundTasks(); -} - -void WriteBufferFromS3::processWithStrictParts() -{ - chassert(strict_upload_part_size > 0); - - size_t buffer_size = offset(); - size_t left_in_buffer = buffer_size; - size_t new_size = last_part_size + buffer_size; - size_t buffer_offset = 0; - - if (new_size > strict_upload_part_size) - { - /// Data size will exceed fixed part size threshold for multipart upload, need to use multipart upload. - if (multipart_upload_id.empty()) - createMultipartUpload(); - - while (new_size > strict_upload_part_size) - { - size_t to_write = strict_upload_part_size - last_part_size; - temporary_buffer->write(working_buffer.begin() + buffer_offset, to_write); - buffer_offset += to_write; - - writePart(); - allocateBuffer(); - - new_size -= strict_upload_part_size; - left_in_buffer -= to_write; - } - } - - if (left_in_buffer) - { - temporary_buffer->write(working_buffer.begin() + buffer_offset, left_in_buffer); - last_part_size += left_in_buffer; - } - - ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, buffer_size); - - if (write_settings.remote_throttler) - write_settings.remote_throttler->add(buffer_size, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds); -} - -void WriteBufferFromS3::processWithDynamicParts() -{ - chassert(current_upload_part_size > 0); - - size_t size = offset(); - temporary_buffer->write(working_buffer.begin(), size); - ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, size); - last_part_size += size; - - if (write_settings.remote_throttler) - write_settings.remote_throttler->add(size, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds); - - /// Data size exceeds singlepart upload threshold, need to use multipart upload. - if (multipart_upload_id.empty() && last_part_size > upload_settings.max_single_part_upload_size) - createMultipartUpload(); - - if (!multipart_upload_id.empty() && last_part_size > current_upload_part_size) - { - writePart(); - allocateBuffer(); - } -} - -void WriteBufferFromS3::allocateBuffer() -{ - temporary_buffer = Aws::MakeShared("temporary buffer"); - temporary_buffer->exceptions(std::ios::badbit); - last_part_size = 0; -} - -WriteBufferFromS3::~WriteBufferFromS3() -{ -#ifndef NDEBUG - if (!finalized) - { - LOG_ERROR(log, "WriteBufferFromS3 is not finalized in destructor. It's a bug"); - std::terminate(); - } -#else - try - { - finalize(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } -#endif + allocateBuffer(); } void WriteBufferFromS3::preFinalize() { - next(); + if (is_prefinalized) + return; - if (multipart_upload_id.empty()) + LOG_TRACE(log, "preFinalize WriteBufferFromS3. {}", getLogDetails()); + + task_tracker->getReady(); + + hidePartialData(); + + if (hidden_size > 0) + detachBuffer(); + setFakeBufferWhenPreFinalized(); + + bool do_single_part_upload = false; + + if (multipart_upload_id.empty() && detached_part_data.size() <= 1) { - makeSinglepartUpload(); + if (detached_part_data.empty() || detached_part_data.front().data_size <= upload_settings.max_single_part_upload_size) + do_single_part_upload = true; + } + + if (do_single_part_upload) + { + if (detached_part_data.empty()) + { + makeSinglepartUpload({}); + } + else + { + makeSinglepartUpload(std::move(detached_part_data.front())); + detached_part_data.pop_front(); + } } else { - /// Write rest of the data as last part. - writePart(); + writeMultipartUpload(); } is_prefinalized = true; @@ -222,24 +244,182 @@ void WriteBufferFromS3::preFinalize() void WriteBufferFromS3::finalizeImpl() { + LOG_TRACE(log, "finalizeImpl WriteBufferFromS3. {}.", getLogDetails()); + if (!is_prefinalized) preFinalize(); - waitForAllBackgroundTasks(); + chassert(offset() == 0); + chassert(hidden_size == 0); + + task_tracker->getAll(); if (!multipart_upload_id.empty()) + { completeMultipartUpload(); + multipart_upload_finished = true; + } if (request_settings.check_objects_after_upload) { LOG_TRACE(log, "Checking object {} exists after upload", key); S3::checkObjectExists(*client_ptr, bucket, key, {}, request_settings, /* for_disk_s3= */ write_settings.for_object_storage, "Immediately after upload"); + + LOG_TRACE(log, "Checking object {} has size as expected {}", key, total_size); + size_t actual_size = S3::getObjectSize(*client_ptr, bucket, key, {}, request_settings, /* for_disk_s3= */ write_settings.for_object_storage); + if (actual_size != total_size) + throw Exception( + ErrorCodes::S3_ERROR, + "Object {} from bucket {} has unexpected size {} after upload, expected size {}, it's a bug in S3 or S3 API.", + key, bucket, actual_size, total_size); + LOG_TRACE(log, "Object {} exists after upload", key); } } -void WriteBufferFromS3::fillCreateMultipartRequest(DB::S3::CreateMultipartUploadRequest & req) +String WriteBufferFromS3::getLogDetails() const { + String multipart_upload_details; + if (!multipart_upload_id.empty()) + multipart_upload_details = fmt::format(", upload id {}, upload has finished {}" + , multipart_upload_id, multipart_upload_finished); + + return fmt::format("Details: bucket {}, key {}, total size {}, count {}, hidden_size {}, offset {}, with pool: {}, finalized {}{}", + bucket, key, total_size, count(), hidden_size, offset(), task_tracker->isAsync(), finalized, multipart_upload_details); +} + +void WriteBufferFromS3::tryToAbortMultipartUpload() +{ + try + { + task_tracker->safeWaitAll(); + abortMultipartUpload(); + } + catch (...) + { + LOG_ERROR(log, "Multipart upload hasn't aborted. {}", getLogDetails()); + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +WriteBufferFromS3::~WriteBufferFromS3() +{ + LOG_TRACE(log, "Close WriteBufferFromS3. {}.", getLogDetails()); + + // That descructor could be call with finalized=false in case of exceptions + if (!finalized) + { + LOG_ERROR(log, "WriteBufferFromS3 is not finalized in destructor. It could be if an exception occurs. File is not written to S3. {}.", getLogDetails()); + } + + task_tracker->safeWaitAll(); + + if (!multipart_upload_id.empty() && !multipart_upload_finished) + { + LOG_WARNING(log, "WriteBufferFromS3 was neither finished nor aborted, try to abort upload in destructor. {}.", getLogDetails()); + tryToAbortMultipartUpload(); + } +} + +void WriteBufferFromS3::hidePartialData() +{ + if (write_settings.remote_throttler) + write_settings.remote_throttler->add(offset(), ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds); + + chassert(memory.size() >= hidden_size + offset()); + + hidden_size += offset(); + chassert(memory.data() + hidden_size == working_buffer.begin() + offset()); + chassert(memory.data() + hidden_size == position()); + + WriteBuffer::set(memory.data() + hidden_size, memory.size() - hidden_size); + chassert(offset() == 0); +} + +void WriteBufferFromS3::reallocateFirstBuffer() +{ + chassert(offset() == 0); + + if (buffer_allocation_policy->getNumber() > 1 || available() > 0) + return; + + const size_t max_first_buffer = buffer_allocation_policy->getSize(); + if (memory.size() == max_first_buffer) + return; + + size_t size = std::min(memory.size() * 2, max_first_buffer); + memory.resize(size); + + WriteBuffer::set(memory.data() + hidden_size, memory.size() - hidden_size); + + chassert(offset() == 0); + + LOG_TRACE(log, "Reallocated first buffer with size {}. {}", memory.size(), getLogDetails()); +} + +void WriteBufferFromS3::detachBuffer() +{ + size_t data_size = size_t(position() - memory.data()); + chassert(data_size == hidden_size); + + auto buf = std::move(memory); + + WriteBuffer::set(nullptr, 0); + total_size += hidden_size; + hidden_size = 0; + + detached_part_data.push_back({std::move(buf), data_size}); +} + +void WriteBufferFromS3::allocateFirstBuffer() +{ + const auto max_first_buffer = buffer_allocation_policy->getSize(); + const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer); + memory = Memory(size); + WriteBuffer::set(memory.data(), memory.size()); + + LOG_TRACE(log, "Allocated first buffer with size {}. {}", memory.size(), getLogDetails()); +} + +void WriteBufferFromS3::allocateBuffer() +{ + buffer_allocation_policy->next(); + chassert(0 == hidden_size); + + if (buffer_allocation_policy->getNumber() == 1) + return allocateFirstBuffer(); + + memory = Memory(buffer_allocation_policy->getSize()); + WriteBuffer::set(memory.data(), memory.size()); + + LOG_TRACE(log, "Allocated buffer with size {}. {}", buffer_allocation_policy->getSize(), getLogDetails()); +} + +void WriteBufferFromS3::setFakeBufferWhenPreFinalized() +{ + WriteBuffer::set(fake_buffer_when_prefinalized, sizeof(fake_buffer_when_prefinalized)); +} + +void WriteBufferFromS3::writeMultipartUpload() +{ + if (multipart_upload_id.empty()) + { + createMultipartUpload(); + } + + while (!detached_part_data.empty()) + { + writePart(std::move(detached_part_data.front())); + detached_part_data.pop_front(); + } +} + +void WriteBufferFromS3::createMultipartUpload() +{ + LOG_TRACE(log, "Create multipart upload. Bucket: {}, Key: {}, Upload id: {}", bucket, key, multipart_upload_id); + + S3::CreateMultipartUploadRequest req; + req.SetBucket(bucket); req.SetKey(key); @@ -250,12 +430,6 @@ void WriteBufferFromS3::fillCreateMultipartRequest(DB::S3::CreateMultipartUpload req.SetMetadata(object_metadata.value()); client_ptr->setKMSHeaders(req); -} - -void WriteBufferFromS3::createMultipartUpload() -{ - DB::S3::CreateMultipartUploadRequest req; - fillCreateMultipartRequest(req); ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload); if (write_settings.for_object_storage) @@ -267,184 +441,164 @@ void WriteBufferFromS3::createMultipartUpload() ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); - if (outcome.IsSuccess()) - { - multipart_upload_id = outcome.GetResult().GetUploadId(); - LOG_TRACE(log, "Multipart upload has created. Bucket: {}, Key: {}, Upload id: {}", bucket, key, multipart_upload_id); - } - else + if (!outcome.IsSuccess()) { ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); } + + multipart_upload_id = outcome.GetResult().GetUploadId(); + LOG_TRACE(log, "Multipart upload has created. {}", getLogDetails()); } -void WriteBufferFromS3::writePart() +void WriteBufferFromS3::abortMultipartUpload() { - auto size = temporary_buffer->tellp(); - - LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Size: {}", bucket, key, multipart_upload_id, size); - - if (size < 0) + if (multipart_upload_id.empty()) { - LOG_WARNING(log, "Skipping part upload. Buffer is in bad state, it means that we have tried to upload something, but got an exception."); + LOG_WARNING(log, "Nothing to abort. {}", getLogDetails()); return; } - if (size == 0) + LOG_WARNING(log, "Abort multipart upload. {}", getLogDetails()); + + S3::AbortMultipartUploadRequest req; + req.SetBucket(bucket); + req.SetKey(key); + req.SetUploadId(multipart_upload_id); + + ProfileEvents::increment(ProfileEvents::S3AbortMultipartUpload); + if (write_settings.for_object_storage) + ProfileEvents::increment(ProfileEvents::DiskS3AbortMultipartUpload); + + Stopwatch watch; + auto outcome = client_ptr->AbortMultipartUpload(req); + watch.stop(); + + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); + + if (!outcome.IsSuccess()) { - LOG_TRACE(log, "Skipping writing part. Buffer is empty."); - return; + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); + throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); } - if (TSA_SUPPRESS_WARNING_FOR_READ(part_tags).size() == S3_WARN_MAX_PARTS) - { - // Don't throw exception here by ourselves but leave the decision to take by S3 server. - LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload."); - } - - if (schedule) - { - UploadPartTask * task = nullptr; - - { - std::lock_guard lock(bg_tasks_mutex); - task = &upload_object_tasks.emplace_back(); - ++num_added_bg_tasks; - } - - /// Notify waiting thread when task finished - auto task_finish_notify = [&, task]() - { - std::lock_guard lock(bg_tasks_mutex); - task->is_finished = true; - ++num_finished_bg_tasks; - - /// Notification under mutex is important here. - /// Otherwise, WriteBuffer could be destroyed in between - /// Releasing lock and condvar notification. - bg_tasks_condvar.notify_one(); - }; - - try - { - fillUploadRequest(task->req); - - schedule([this, task, task_finish_notify]() - { - try - { - processUploadRequest(*task); - } - catch (...) - { - task->exception = std::current_exception(); - } - - task_finish_notify(); - }, 0); - } - catch (...) - { - task_finish_notify(); - throw; - } - } - else - { - UploadPartTask task; - auto & tags = TSA_SUPPRESS_WARNING_FOR_WRITE(part_tags); /// Suppress warning because schedule == false. - - fillUploadRequest(task.req); - processUploadRequest(task); - tags.push_back(task.tag); - } + LOG_WARNING(log, "Multipart upload has aborted successfully. {}", getLogDetails()); } -void WriteBufferFromS3::fillUploadRequest(S3::UploadPartRequest & req) +S3::UploadPartRequest WriteBufferFromS3::getUploadRequest(size_t part_number, PartData & data) { - /// Increase part number. - ++part_number; + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, data.data_size); + LOG_TRACE(log, "fillUploadRequest, size {}, key: {}", data.data_size, key); - auto max_part_number = upload_settings.max_part_number; - - if (!multipart_upload_id.empty() && (part_number > max_part_number)) - { - throw Exception( - ErrorCodes::INVALID_CONFIG_PARAMETER, - "Part number exceeded {} while writing {} bytes to S3. " - "Check min_upload_part_size = {}, max_upload_part_size = {}, " - "upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, " - "max_single_part_upload_size = {}", - max_part_number, count(), - upload_settings.min_upload_part_size, upload_settings.max_upload_part_size, - upload_settings.upload_part_size_multiply_factor, - upload_settings.upload_part_size_multiply_parts_count_threshold, - upload_settings.max_single_part_upload_size); - } + S3::UploadPartRequest req; /// Setup request. req.SetBucket(bucket); req.SetKey(key); req.SetPartNumber(static_cast(part_number)); req.SetUploadId(multipart_upload_id); - req.SetContentLength(temporary_buffer->tellp()); - req.SetBody(temporary_buffer); - + req.SetContentLength(data.data_size); + req.SetBody(data.createAwsBuffer()); /// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840 req.SetContentType("binary/octet-stream"); - if (!strict_upload_part_size) - { - /// Maybe increase `current_upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`). - auto threshold = upload_settings.upload_part_size_multiply_parts_count_threshold; - if (!multipart_upload_id.empty() && (part_number % threshold == 0)) - { - auto max_upload_part_size = upload_settings.max_upload_part_size; - auto upload_part_size_multiply_factor = upload_settings.upload_part_size_multiply_factor; - current_upload_part_size *= upload_part_size_multiply_factor; - current_upload_part_size = std::min(current_upload_part_size, max_upload_part_size); - } - } + return req; } -void WriteBufferFromS3::processUploadRequest(UploadPartTask & task) +void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data) { - ProfileEvents::increment(ProfileEvents::S3UploadPart); - if (write_settings.for_object_storage) - ProfileEvents::increment(ProfileEvents::DiskS3UploadPart); - - ResourceCost cost = task.req.GetContentLength(); - ResourceGuard rlock(write_settings.resource_link, cost); - Stopwatch watch; - auto outcome = client_ptr->UploadPart(task.req); - watch.stop(); - rlock.unlock(); // Avoid acquiring other locks under resource lock - - ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); - - if (outcome.IsSuccess()) + if (data.data_size == 0) { - task.tag = outcome.GetResult().GetETag(); - std::lock_guard lock(bg_tasks_mutex); /// Protect part_tags from race - LOG_TRACE(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, task.tag, part_tags.size()); + LOG_TRACE(log, "Skipping writing part as empty."); + return; } - else + + multipart_tags.push_back({}); + size_t part_number = multipart_tags.size(); + LOG_TRACE(log, "WritePart. {}, part size: {}, part number: {}", getLogDetails(), data.data_size, part_number); + + if (multipart_upload_id.empty()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Unable to write a part without multipart_upload_id, details: WriteBufferFromS3 created for bucket {}, key {}", + bucket, key); + + if (part_number > upload_settings.max_part_number) { - ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure - throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); + throw Exception( + ErrorCodes::INVALID_CONFIG_PARAMETER, + "Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, " + "upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_part_upload_size = {}", + upload_settings.max_part_number, count(), upload_settings.min_upload_part_size, upload_settings.max_upload_part_size, + upload_settings.upload_part_size_multiply_factor, upload_settings.upload_part_size_multiply_parts_count_threshold, + upload_settings.max_single_part_upload_size); } + + if (data.data_size > upload_settings.max_upload_part_size) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Part size exceeded max_upload_part_size, part number: {}, part size {}, max_upload_part_size {}, {}", + part_number, + data.data_size, + upload_settings.max_upload_part_size, + getLogDetails()); + } + + auto req = getUploadRequest(part_number, data); + auto worker_data = std::make_shared>(std::move(req), std::move(data)); + + auto upload_worker = [&, worker_data, part_number] () + { + LOG_TEST(log, "Writing part started. bucket {}, key {}, part id {}", bucket, key, part_number); + + ProfileEvents::increment(ProfileEvents::S3UploadPart); + if (write_settings.for_object_storage) + ProfileEvents::increment(ProfileEvents::DiskS3UploadPart); + + auto & request = std::get<0>(*worker_data); + + ResourceCost cost = request.GetContentLength(); + ResourceGuard rlock(write_settings.resource_link, cost); + Stopwatch watch; + auto outcome = client_ptr->UploadPart(request); + watch.stop(); + rlock.unlock(); // Avoid acquiring other locks under resource lock + + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); + + if (!outcome.IsSuccess()) + { + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); + write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure + throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); + } + + multipart_tags[part_number-1] = outcome.GetResult().GetETag(); + + LOG_TEST(log, "Writing part finished. bucket {}, key{}, part id {}, etag {}", bucket, key, part_number, multipart_tags[part_number-1]); + }; + + task_tracker->add(std::move(upload_worker)); } void WriteBufferFromS3::completeMultipartUpload() { - const auto & tags = TSA_SUPPRESS_WARNING_FOR_READ(part_tags); + LOG_TRACE(log, "Completing multipart upload. {}, Parts: {}", getLogDetails(), multipart_tags.size()); - LOG_TRACE(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, tags.size()); + if (multipart_tags.empty()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Failed to complete multipart upload. No parts have uploaded"); - if (tags.empty()) - throw Exception(ErrorCodes::S3_ERROR, "Failed to complete multipart upload. No parts have uploaded"); + for (size_t i = 0; i < multipart_tags.size(); ++i) + { + const auto tag = multipart_tags.at(i); + if (tag.empty()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Failed to complete multipart upload. Part {} haven't been uploaded.", i); + } S3::CompleteMultipartUploadRequest req; req.SetBucket(bucket); @@ -452,10 +606,10 @@ void WriteBufferFromS3::completeMultipartUpload() req.SetUploadId(multipart_upload_id); Aws::S3::Model::CompletedMultipartUpload multipart_upload; - for (size_t i = 0; i < tags.size(); ++i) + for (size_t i = 0; i < multipart_tags.size(); ++i) { Aws::S3::Model::CompletedPart part; - multipart_upload.AddParts(part.WithETag(tags[i]).WithPartNumber(static_cast(i + 1))); + multipart_upload.AddParts(part.WithETag(multipart_tags[i]).WithPartNumber(static_cast(i + 1))); } req.SetMultipartUpload(multipart_upload); @@ -475,26 +629,24 @@ void WriteBufferFromS3::completeMultipartUpload() if (outcome.IsSuccess()) { - LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, tags.size()); + LOG_TRACE(log, "Multipart upload has completed. {}, Parts: {}", getLogDetails(), multipart_tags.size()); return; } + + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); + + if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) + { + /// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests + /// BTW, NO_SUCH_UPLOAD is expected error and we shouldn't retry it + LOG_INFO(log, "Multipart upload failed with NO_SUCH_KEY error, will retry. {}, Parts: {}", getLogDetails(), multipart_tags.size()); + } else { - ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); - - if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) - { - /// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests - /// BTW, NO_SUCH_UPLOAD is expected error and we shouldn't retry it - LOG_INFO(log, "Multipart upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Upload_id: {}, Parts: {}, will retry", bucket, key, multipart_upload_id, tags.size()); - } - else - { - throw S3Exception( - outcome.GetError().GetErrorType(), - "Message: {}, Key: {}, Bucket: {}, Tags: {}", - outcome.GetError().GetMessage(), key, bucket, fmt::join(tags.begin(), tags.end(), " ")); - } + throw S3Exception( + outcome.GetError().GetErrorType(), + "Message: {}, Key: {}, Bucket: {}, Tags: {}", + outcome.GetError().GetMessage(), key, bucket, fmt::join(multipart_tags.begin(), multipart_tags.end(), " ")); } } @@ -504,73 +656,17 @@ void WriteBufferFromS3::completeMultipartUpload() max_retry, key, bucket); } -void WriteBufferFromS3::makeSinglepartUpload() +S3::PutObjectRequest WriteBufferFromS3::getPutRequest(PartData & data) { - auto size = temporary_buffer->tellp(); - bool with_pool = static_cast(schedule); + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, data.data_size); + LOG_TRACE(log, "getPutRequest, size {}, key {}", data.data_size, key); - LOG_TRACE(log, "Making single part upload. Bucket: {}, Key: {}, Size: {}, WithPool: {}", bucket, key, size, with_pool); + S3::PutObjectRequest req; - if (size < 0) - { - LOG_WARNING(log, "Skipping single part upload. Buffer is in bad state, it mean that we have tried to upload something, but got an exception."); - return; - } - - if (schedule) - { - put_object_task = std::make_unique(); - - /// Notify waiting thread when put object task finished - auto task_notify_finish = [&]() - { - std::lock_guard lock(bg_tasks_mutex); - put_object_task->is_finished = true; - - /// Notification under mutex is important here. - /// Othervies, WriteBuffer could be destroyed in between - /// Releasing lock and condvar notification. - bg_tasks_condvar.notify_one(); - }; - - try - { - fillPutRequest(put_object_task->req); - - schedule([this, task_notify_finish]() - { - try - { - processPutRequest(*put_object_task); - } - catch (...) - { - put_object_task->exception = std::current_exception(); - } - - task_notify_finish(); - }, 0); - } - catch (...) - { - task_notify_finish(); - throw; - } - } - else - { - PutObjectTask task; - fillPutRequest(task.req); - processPutRequest(task); - } -} - -void WriteBufferFromS3::fillPutRequest(S3::PutObjectRequest & req) -{ req.SetBucket(bucket); req.SetKey(key); - req.SetContentLength(temporary_buffer->tellp()); - req.SetBody(temporary_buffer); + req.SetContentLength(data.data_size); + req.SetBody(data.createAwsBuffer()); if (object_metadata.has_value()) req.SetMetadata(object_metadata.value()); if (!upload_settings.storage_class_name.empty()) @@ -580,121 +676,73 @@ void WriteBufferFromS3::fillPutRequest(S3::PutObjectRequest & req) req.SetContentType("binary/octet-stream"); client_ptr->setKMSHeaders(req); + + return req; } -void WriteBufferFromS3::processPutRequest(const PutObjectTask & task) +void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data) { - size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL); - for (size_t i = 0; i < max_retry; ++i) + LOG_TRACE(log, "Making single part upload. {}.", getLogDetails()); + + auto req = getPutRequest(data); + auto worker_data = std::make_shared>(std::move(req), std::move(data)); + + auto upload_worker = [&, worker_data] () { - ProfileEvents::increment(ProfileEvents::S3PutObject); - if (write_settings.for_object_storage) - ProfileEvents::increment(ProfileEvents::DiskS3PutObject); + LOG_TEST(log, "writing single part upload started. bucket {}, key {}", bucket, key); - ResourceCost cost = task.req.GetContentLength(); - ResourceGuard rlock(write_settings.resource_link, cost); - Stopwatch watch; - auto outcome = client_ptr->PutObject(task.req); - watch.stop(); - rlock.unlock(); + auto & request = std::get<0>(*worker_data); + size_t content_length = request.GetContentLength(); - ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); - - bool with_pool = static_cast(schedule); - if (outcome.IsSuccess()) - { - LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool); - return; - } - else + size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL); + for (size_t i = 0; i < max_retry; ++i) { + ProfileEvents::increment(ProfileEvents::S3PutObject); + if (write_settings.for_object_storage) + ProfileEvents::increment(ProfileEvents::DiskS3PutObject); + + ResourceCost cost = request.GetContentLength(); + ResourceGuard rlock(write_settings.resource_link, cost); + Stopwatch watch; + auto outcome = client_ptr->PutObject(request); + watch.stop(); + rlock.unlock(); + + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds()); + + if (outcome.IsSuccess()) + { + LOG_TRACE(log, "Single part upload has completed. bucket {}, key {}, object size {}", bucket, key, content_length); + return; + } + ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1); + write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure + if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) { - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure + /// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests - LOG_INFO(log, "Single part upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Object size: {}, WithPool: {}, will retry", bucket, key, task.req.GetContentLength(), with_pool); + LOG_INFO(log, "Single part upload failed with NO_SUCH_KEY error for bucket {}, key {}, object size {}, will retry", bucket, key, content_length); } else { - write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure + LOG_ERROR(log, "S3Exception name {}, Message: {}, bucket {}, key {}, object size {}", + outcome.GetError().GetExceptionName(), outcome.GetError().GetMessage(), bucket, key, content_length); throw S3Exception( outcome.GetError().GetErrorType(), - "Message: {}, Key: {}, Bucket: {}, Object size: {}, WithPool: {}", - outcome.GetError().GetMessage(), key, bucket, task.req.GetContentLength(), with_pool); + "Message: {}, bucket {}, key {}, object size {}", + outcome.GetError().GetMessage(), bucket, key, content_length); } } - } - throw S3Exception( - Aws::S3::S3Errors::NO_SUCH_KEY, - "Message: Single part upload failed with NO_SUCH_KEY error, retries {}, Key: {}, Bucket: {}", - max_retry, key, bucket); -} + throw S3Exception( + Aws::S3::S3Errors::NO_SUCH_KEY, + "Message: Single part upload failed with NO_SUCH_KEY error, retries {}, Key: {}, Bucket: {}", + max_retry, key, bucket); + }; -void WriteBufferFromS3::waitForReadyBackgroundTasks() -{ - if (schedule) - { - std::unique_lock lock(bg_tasks_mutex); - - /// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock - auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(upload_object_tasks); - - while (!tasks.empty() && tasks.front().is_finished) - { - auto & task = tasks.front(); - auto exception = task.exception; - auto tag = std::move(task.tag); - tasks.pop_front(); - - if (exception) - { - waitForAllBackgroundTasksUnlocked(lock); - std::rethrow_exception(exception); - } - - TSA_SUPPRESS_WARNING_FOR_WRITE(part_tags).push_back(tag); - } - } -} - -void WriteBufferFromS3::waitForAllBackgroundTasks() -{ - if (schedule) - { - std::unique_lock lock(bg_tasks_mutex); - waitForAllBackgroundTasksUnlocked(lock); - } -} - -void WriteBufferFromS3::waitForAllBackgroundTasksUnlocked(std::unique_lock & bg_tasks_lock) -{ - if (schedule) - { - bg_tasks_condvar.wait(bg_tasks_lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); }); - - /// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock - auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(upload_object_tasks); - while (!tasks.empty()) - { - auto & task = tasks.front(); - - if (task.exception) - std::rethrow_exception(task.exception); - - TSA_SUPPRESS_WARNING_FOR_WRITE(part_tags).push_back(task.tag); - - tasks.pop_front(); - } - - if (put_object_task) - { - bg_tasks_condvar.wait(bg_tasks_lock, [this]() { return put_object_task->is_finished; }); - if (put_object_task->exception) - std::rethrow_exception(put_object_task->exception); - } - } + task_tracker->add(std::move(upload_worker)); } } diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index 2374f1502f5..13ed151ad57 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -4,20 +4,16 @@ #if USE_AWS_S3 -#include -#include -#include - #include #include #include #include -#include #include #include -#include - +#include +#include +#include namespace Aws::S3 { @@ -27,8 +23,6 @@ class Client; namespace DB { -class WriteBufferFromFile; - /** * Buffer to write a data to a S3 object with specified bucket and key. * If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload. @@ -45,81 +39,74 @@ public: const String & key_, const S3Settings::RequestSettings & request_settings_, std::optional> object_metadata_ = std::nullopt, - size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, ThreadPoolCallbackRunner schedule_ = {}, const WriteSettings & write_settings_ = {}); ~WriteBufferFromS3() override; - void nextImpl() override; - void preFinalize() override; private: - void allocateBuffer(); - - void processWithStrictParts(); - void processWithDynamicParts(); - - void fillCreateMultipartRequest(S3::CreateMultipartUploadRequest & req); - void createMultipartUpload(); - void writePart(); - void completeMultipartUpload(); - - void makeSinglepartUpload(); - /// Receives response from the server after sending all data. void finalizeImpl() override; - struct UploadPartTask; - void fillUploadRequest(S3::UploadPartRequest & req); - void processUploadRequest(UploadPartTask & task); + String getLogDetails() const; - struct PutObjectTask; - void fillPutRequest(S3::PutObjectRequest & req); - void processPutRequest(const PutObjectTask & task); + struct PartData; + void hidePartialData(); + void allocateFirstBuffer(); + void reallocateFirstBuffer(); + void detachBuffer(); + void allocateBuffer(); + void setFakeBufferWhenPreFinalized(); - void waitForReadyBackgroundTasks(); - void waitForAllBackgroundTasks(); - void waitForAllBackgroundTasksUnlocked(std::unique_lock & bg_tasks_lock); + S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data); + void writePart(PartData && data); + void writeMultipartUpload(); + void createMultipartUpload(); + void completeMultipartUpload(); + void abortMultipartUpload(); + void tryToAbortMultipartUpload(); + + S3::PutObjectRequest getPutRequest(PartData & data); + void makeSinglepartUpload(PartData && data); const String bucket; const String key; const S3Settings::RequestSettings request_settings; const S3Settings::RequestSettings::PartUploadSettings & upload_settings; + const WriteSettings write_settings; const std::shared_ptr client_ptr; const std::optional> object_metadata; + Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3"); - /// Strict/static Part size, no adjustments will be done on fly. - size_t strict_upload_part_size = 0; - /// Part size will be adjusted on fly (for bigger uploads) - size_t current_upload_part_size = 0; - std::shared_ptr temporary_buffer; /// Buffer to accumulate data. - size_t last_part_size = 0; - size_t part_number = 0; + struct BufferAllocationPolicy; + std::unique_ptr buffer_allocation_policy; /// Upload in S3 is made in parts. /// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts. String multipart_upload_id; - std::vector TSA_GUARDED_BY(bg_tasks_mutex) part_tags; + std::deque multipart_tags; + bool multipart_upload_finished = false; + /// Track that prefinalize() is called only once bool is_prefinalized = false; - /// Following fields are for background uploads in thread pool (if specified). - /// We use std::function to avoid dependency of Interpreters - const ThreadPoolCallbackRunner schedule; + /// First fully filled buffer has to be delayed + /// There are two ways after: + /// First is to call prefinalize/finalize, which leads to single part upload + /// Second is to write more data, which leads to multi part upload + std::deque detached_part_data; + char fake_buffer_when_prefinalized[1] = {}; - std::unique_ptr put_object_task; /// Does not need protection by mutex because of the logic around is_finished field. - std::list TSA_GUARDED_BY(bg_tasks_mutex) upload_object_tasks; - int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; - int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0; + /// offset() and count() are unstable inside nextImpl + /// For example nextImpl changes position hence offset() and count() is changed + /// This vars are dedicated to store information about sizes when offset() and count() are unstable + size_t total_size = 0; + size_t hidden_size = 0; - std::mutex bg_tasks_mutex; - std::condition_variable bg_tasks_condvar; - - Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3"); - - WriteSettings write_settings; + class TaskTracker; + std::unique_ptr task_tracker; }; } diff --git a/src/IO/WriteBufferFromS3MemoryStream.cpp b/src/IO/WriteBufferFromS3MemoryStream.cpp new file mode 100644 index 00000000000..6271f15f055 --- /dev/null +++ b/src/IO/WriteBufferFromS3MemoryStream.cpp @@ -0,0 +1,68 @@ +#include "config.h" + +#if USE_AWS_S3 + +#include + +namespace DB +{ + +MemoryStream::MemoryBuf::MemoryBuf(char * begin_, size_t size_) + : begin(begin_) + , size(size_) +{ + this->setg(begin, begin, begin + size); +} + +MemoryStream::MemoryBuf::int_type MemoryStream::MemoryBuf::underflow() +{ + if (gptr() < egptr()) + return traits_type::to_int_type(*gptr()); + return traits_type::eof(); +} + +MemoryStream::MemoryBuf::pos_type MemoryStream::MemoryBuf::seekoff(off_type off, std::ios_base::seekdir way, + std::ios_base::openmode mode) +{ + bool out_mode = (std::ios_base::out & mode) != 0; + if (out_mode) + return off_type(-1); + + off_type ret(-1); + + if (way == std::ios_base::beg) + ret = 0; + else if (way == std::ios_base::cur) + ret = gptr() - begin; + else if (way == std::ios_base::end) + ret = size; + + if (ret == off_type(-1)) + return ret; + + ret += off; + if (!(ret >= 0 && size_t(ret) <= size)) + return off_type(-1); + + this->setg(begin, begin + ret, begin + size); + + return pos_type(ret); +} + +MemoryStream::MemoryBuf::pos_type MemoryStream::MemoryBuf::seekpos(pos_type sp, + std::ios_base::openmode mode) +{ + return seekoff(off_type(sp), std::ios_base::beg, mode); +} + +MemoryStream::MemoryStream(char * begin_, size_t size_) + : std::iostream(nullptr) + , mem_buf(begin_, size_) +{ + init(&mem_buf); +} + +} + +#endif + diff --git a/src/IO/WriteBufferFromS3MemoryStream.h b/src/IO/WriteBufferFromS3MemoryStream.h new file mode 100644 index 00000000000..5a7cc17705d --- /dev/null +++ b/src/IO/WriteBufferFromS3MemoryStream.h @@ -0,0 +1,39 @@ +#pragma once + +#include "config.h" + +#if USE_AWS_S3 + +#include "WriteBufferFromS3.h" + +#include + +namespace DB +{ + +struct MemoryStream: std::iostream +{ + struct MemoryBuf: std::streambuf + { + MemoryBuf(char * begin_, size_t size_); + + int_type underflow() override; + + pos_type seekoff(off_type off, std::ios_base::seekdir way, + std::ios_base::openmode mode) override; + + pos_type seekpos(pos_type sp, + std::ios_base::openmode mode) override; + + char * begin = nullptr; + size_t size = 0; + }; + + MemoryStream(char * begin_, size_t size_); + + MemoryBuf mem_buf; +}; + +} + +#endif diff --git a/src/IO/WriteBufferFromS3TaskTracker.cpp b/src/IO/WriteBufferFromS3TaskTracker.cpp new file mode 100644 index 00000000000..0769f7731c2 --- /dev/null +++ b/src/IO/WriteBufferFromS3TaskTracker.cpp @@ -0,0 +1,137 @@ +#include "config.h" + +#if USE_AWS_S3 + +#include + +namespace DB +{ + +WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner scheduler_) + : is_async(bool(scheduler_)) + , scheduler(scheduler_ ? std::move(scheduler_) : syncRunner()) +{} + +WriteBufferFromS3::TaskTracker::~TaskTracker() +{ + safeWaitAll(); +} + +ThreadPoolCallbackRunner WriteBufferFromS3::TaskTracker::syncRunner() +{ + return [](Callback && callback, int64_t) mutable -> std::future + { + auto package = std::packaged_task(std::move(callback)); + /// No exceptions are propagated, exceptions are packed to future + package(); + return package.get_future(); + }; +} + +void WriteBufferFromS3::TaskTracker::getReady() +{ + LOG_TEST(log, "getReady, in queue {}", futures.size()); + + /// Exceptions are propagated + auto it = futures.begin(); + while (it != futures.end()) + { + chassert(it->valid()); + if (it->wait_for(std::chrono::seconds(0)) != std::future_status::ready) + { + ++it; + continue; + } + + try + { + it->get(); + } catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + throw; + } + + it = futures.erase(it); + } + + LOG_TEST(log, "getReady ended, in queue {}", futures.size()); +} + +void WriteBufferFromS3::TaskTracker::getAll() +{ + LOG_TEST(log, "getAll, in queue {}", futures.size()); + + /// Exceptions are propagated + for (auto & future : futures) + { + try + { + future.get(); + } catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + throw; + } + } + futures.clear(); +} + +void WriteBufferFromS3::TaskTracker::safeWaitAll() +{ + LOG_TEST(log, "safeWaitAll, wait in queue {}", futures.size()); + + /// Exceptions are not propagated + for (auto & future : futures) + { + LOG_TEST(log, "safeWaitAll, wait future"); + + if (future.valid()) + future.wait(); + } + + LOG_TEST(log, "safeWaitAll, get in queue {}", futures.size()); + + for (auto & future : futures) + { + if (future.valid()) + { + try + { + future.get(); + } catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + } + futures.clear(); + LOG_TEST(log, "safeWaitAll ended, get in queue {}", futures.size()); +} + +void WriteBufferFromS3::TaskTracker::add(Callback && func) +{ + LOG_TEST(log, "add, in queue {}", futures.size()); + + auto future = scheduler(std::move(func), 0); + auto exit_scope = scope_guard( + [&future]() + { + future.wait(); + } + ); + + futures.push_back(std::move(future)); + + exit_scope.release(); + LOG_TEST(log, "add ended, in queue {}", futures.size()); +} + +bool WriteBufferFromS3::TaskTracker::isAsync() const +{ + return is_async; +} + +} + +#endif diff --git a/src/IO/WriteBufferFromS3TaskTracker.h b/src/IO/WriteBufferFromS3TaskTracker.h new file mode 100644 index 00000000000..fa214a4f8c5 --- /dev/null +++ b/src/IO/WriteBufferFromS3TaskTracker.h @@ -0,0 +1,37 @@ +#pragma once + +#include "config.h" + +#if USE_AWS_S3 + +#include "WriteBufferFromS3.h" + +namespace DB +{ + +class WriteBufferFromS3::TaskTracker +{ +public: + using Callback = std::function; + + explicit TaskTracker(ThreadPoolCallbackRunner scheduler_); + ~TaskTracker(); + + static ThreadPoolCallbackRunner syncRunner(); + + bool isAsync() const; + void getReady(); + void getAll(); + void safeWaitAll(); + void add(Callback && func); + +private: + bool is_async; + ThreadPoolCallbackRunner scheduler; + std::list> futures; + Poco::Logger * log = &Poco::Logger::get("TaskTracker"); +}; + +} + +#endif diff --git a/src/IO/tests/gtest_writebuffer_s3.cpp b/src/IO/tests/gtest_writebuffer_s3.cpp new file mode 100644 index 00000000000..d7661d3e3d0 --- /dev/null +++ b/src/IO/tests/gtest_writebuffer_s3.cpp @@ -0,0 +1,1114 @@ +#include + +#include "config.h" + +#if USE_AWS_S3 + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int S3_ERROR; +} + +} + +namespace MockS3 +{ + +class Sequencer +{ +public: + size_t next() { return counter++; } + std::string next_id() + { + std::stringstream ss; + ss << "id-" << next(); + return ss.str(); + } + +private: + size_t counter = 0; +}; + +class BucketMemStore +{ +public: + typedef std::string Key; + typedef std::string Data; + typedef std::string ETag; + typedef std::string MPU_ID; + typedef std::map MPUPartsInProgress; + typedef std::vector MPUParts; + + + std::map objects; + std::map multiPartUploads; + std::vector> CompletedPartUploads; + + Sequencer sequencer; + + std::string CreateMPU() + { + auto id = sequencer.next_id(); + multiPartUploads.emplace(id, MPUPartsInProgress{}); + return id; + } + + std::string UploadPart(const std::string & upload_id, const std::string & part) + { + auto etag = sequencer.next_id(); + auto & parts = multiPartUploads.at(upload_id); + parts.emplace(etag, part); + return etag; + } + + void PutObject(const std::string & key, const std::string & data) + { + objects[key] = data; + } + + void CompleteMPU(const std::string & key, const std::string & upload_id, const std::vector & etags) + { + MPUParts completedParts; + completedParts.reserve(etags.size()); + + auto & parts = multiPartUploads.at(upload_id); + for (const auto & tag: etags) { + completedParts.push_back(parts.at(tag)); + } + + std::stringstream file_data; + for (const auto & part_data: completedParts) { + file_data << part_data; + } + + CompletedPartUploads.emplace_back(upload_id, std::move(completedParts)); + objects[key] = file_data.str(); + multiPartUploads.erase(upload_id); + } + + void AbortMPU(const std::string & upload_id) + { + multiPartUploads.erase(upload_id); + } + + + const std::vector> & GetCompletedPartUploads() const + { + return CompletedPartUploads; + } + + static std::vector GetPartSizes(const MPUParts & parts) + { + std::vector result; + result.reserve(parts.size()); + for (auto & part_data : parts) + result.push_back(part_data.size()); + + return result; + } + +}; + +class S3MemStrore +{ +public: + void CreateBucket(const std::string & bucket) + { + assert(buckets.count(bucket) == 0); + buckets.emplace(bucket, BucketMemStore{}); + } + + BucketMemStore& GetBucketStore(const std::string & bucket) { + return buckets.at(bucket); + } + +private: + std::map buckets; +}; + +struct EventCounts +{ + size_t headObject = 0; + size_t getObject = 0; + size_t putObject = 0; + size_t multiUploadCreate = 0; + size_t multiUploadComplete = 0; + size_t multiUploadAbort = 0; + size_t uploadParts = 0; + size_t writtenSize = 0; + + size_t totalRequestsCount() const + { + return headObject + getObject + putObject + multiUploadCreate + multiUploadComplete + uploadParts; + } +}; + +struct Client; + +struct InjectionModel +{ + virtual ~InjectionModel() = default; + +#define DeclareInjectCall(ObjectTypePart) \ + virtual std::optional call(const Aws::S3::Model::ObjectTypePart##Request & /*request*/) \ + { \ + return std::nullopt; \ + } + DeclareInjectCall(PutObject) + DeclareInjectCall(HeadObject) + DeclareInjectCall(CreateMultipartUpload) + DeclareInjectCall(CompleteMultipartUpload) + DeclareInjectCall(AbortMultipartUpload) + DeclareInjectCall(UploadPart) +#undef DeclareInjectCall +}; + +struct Client : DB::S3::Client +{ + Client(std::shared_ptr mock_s3_store) + : DB::S3::Client( + 100, + DB::S3::ServerSideEncryptionKMSConfig(), + std::make_shared("", ""), + GetClientConfiguration(), + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + /* useVirtualAddressing = */ true) + , store(mock_s3_store) + { } + + static std::shared_ptr CreateClient(String bucket = "mock-s3-bucket") + { + auto s3store = std::make_shared(); + s3store->CreateBucket(bucket); + return std::make_shared(s3store); + } + + static DB::S3::PocoHTTPClientConfiguration GetClientConfiguration() + { + DB::RemoteHostFilter remote_host_filter; + return DB::S3::ClientFactory::instance().createClientConfiguration( + "some-region", + remote_host_filter, + /* s3_max_redirects = */ 100, + /* enable_s3_requests_logging = */ true, + /* for_disk_s3 = */ false, + /* get_request_throttler = */ {}, + /* put_request_throttler = */ {} + ); + } + + void setInjectionModel(std::shared_ptr injections_) + { + injections = injections_; + } + + Aws::S3::Model::PutObjectOutcome PutObject(const Aws::S3::Model::PutObjectRequest & request) const override + { + ++counters.putObject; + + if (injections) + { + if (auto opt_val = injections->call(request)) + { + return *opt_val; + } + } + + auto & bStore = store->GetBucketStore(request.GetBucket()); + std::stringstream data; + data << request.GetBody()->rdbuf(); + bStore.PutObject(request.GetKey(), data.str()); + counters.writtenSize += data.str().length(); + + Aws::S3::Model::PutObjectOutcome outcome; + Aws::S3::Model::PutObjectResult result(outcome.GetResultWithOwnership()); + return result; + } + + Aws::S3::Model::GetObjectOutcome GetObject(const Aws::S3::Model::GetObjectRequest & request) const override + { + ++counters.getObject; + + auto & bStore = store->GetBucketStore(request.GetBucket()); + + auto factory = request.GetResponseStreamFactory(); + Aws::Utils::Stream::ResponseStream responseStream(factory); + responseStream.GetUnderlyingStream() << std::stringstream(bStore.objects[request.GetKey()]).rdbuf(); + + Aws::AmazonWebServiceResult awsStream(std::move(responseStream), Aws::Http::HeaderValueCollection()); + Aws::S3::Model::GetObjectResult getObjectResult(std::move(awsStream)); + return Aws::S3::Model::GetObjectOutcome(std::move(getObjectResult)); + } + + Aws::S3::Model::HeadObjectOutcome HeadObject(const Aws::S3::Model::HeadObjectRequest & request) const override + { + ++counters.headObject; + + if (injections) + { + if (auto opt_val = injections->call(request)) + { + return std::move(*opt_val); + } + } + + auto & bStore = store->GetBucketStore(request.GetBucket()); + auto obj = bStore.objects[request.GetKey()]; + Aws::S3::Model::HeadObjectOutcome outcome; + Aws::S3::Model::HeadObjectResult result(outcome.GetResultWithOwnership()); + result.SetContentLength(obj.length()); + return result; + } + + Aws::S3::Model::CreateMultipartUploadOutcome CreateMultipartUpload(const Aws::S3::Model::CreateMultipartUploadRequest & request) const override + { + ++counters.multiUploadCreate; + + if (injections) + { + if (auto opt_val = injections->call(request)) + { + return std::move(*opt_val); + } + } + + auto & bStore = store->GetBucketStore(request.GetBucket()); + auto mpu_id = bStore.CreateMPU(); + + Aws::S3::Model::CreateMultipartUploadResult result; + result.SetUploadId(mpu_id.c_str()); + return Aws::S3::Model::CreateMultipartUploadOutcome(result); + } + + Aws::S3::Model::UploadPartOutcome UploadPart(const Aws::S3::Model::UploadPartRequest & request) const override + { + ++counters.uploadParts; + + if (injections) + { + if (auto opt_val = injections->call(request)) + { + return std::move(*opt_val); + } + } + + std::stringstream data; + data << request.GetBody()->rdbuf(); + counters.writtenSize += data.str().length(); + + auto & bStore = store->GetBucketStore(request.GetBucket()); + auto etag = bStore.UploadPart(request.GetUploadId(), data.str()); + + Aws::S3::Model::UploadPartResult result; + result.SetETag(etag); + return Aws::S3::Model::UploadPartOutcome(result); + } + + Aws::S3::Model::CompleteMultipartUploadOutcome CompleteMultipartUpload(const Aws::S3::Model::CompleteMultipartUploadRequest & request) const override + { + ++counters.multiUploadComplete; + + if (injections) + { + if (auto opt_val = injections->call(request)) + { + return std::move(*opt_val); + } + } + + auto & bStore = store->GetBucketStore(request.GetBucket()); + + std::vector etags; + for (const auto & x: request.GetMultipartUpload().GetParts()) { + etags.push_back(x.GetETag()); + } + bStore.CompleteMPU(request.GetKey(), request.GetUploadId(), etags); + + Aws::S3::Model::CompleteMultipartUploadResult result; + return Aws::S3::Model::CompleteMultipartUploadOutcome(result); + } + + Aws::S3::Model::AbortMultipartUploadOutcome AbortMultipartUpload(const Aws::S3::Model::AbortMultipartUploadRequest & request) const override + { + ++counters.multiUploadAbort; + + if (injections) + { + if (auto opt_val = injections->call(request)) + { + return std::move(*opt_val); + } + } + + auto & bStore = store->GetBucketStore(request.GetBucket()); + bStore.AbortMPU(request.GetUploadId()); + + Aws::S3::Model::AbortMultipartUploadResult result; + return Aws::S3::Model::AbortMultipartUploadOutcome(result); + } + + std::shared_ptr store; + mutable EventCounts counters; + mutable std::shared_ptr injections; + void resetCounters() const { counters = {}; } +}; + +struct PutObjectFailIngection: InjectionModel +{ + std::optional call(const Aws::S3::Model::PutObjectRequest & /*request*/) override + { + return Aws::Client::AWSError(Aws::Client::CoreErrors::VALIDATION, "FailInjection", "PutObjectFailIngection", false); + } +}; + +struct HeadObjectFailIngection: InjectionModel +{ + std::optional call(const Aws::S3::Model::HeadObjectRequest & /*request*/) override + { + return Aws::Client::AWSError(Aws::Client::CoreErrors::VALIDATION, "FailInjection", "HeadObjectFailIngection", false); + } +}; + +struct CreateMPUFailIngection: InjectionModel +{ + std::optional call(const Aws::S3::Model::CreateMultipartUploadRequest & /*request*/) override + { + return Aws::Client::AWSError(Aws::Client::CoreErrors::VALIDATION, "FailInjection", "CreateMPUFailIngection", false); + } +}; + +struct CompleteMPUFailIngection: InjectionModel +{ + std::optional call(const Aws::S3::Model::CompleteMultipartUploadRequest & /*request*/) override + { + return Aws::Client::AWSError(Aws::Client::CoreErrors::VALIDATION, "FailInjection", "CompleteMPUFailIngection", false); + } +}; + +struct UploadPartFailIngection: InjectionModel +{ + std::optional call(const Aws::S3::Model::UploadPartRequest & /*request*/) override + { + return Aws::Client::AWSError(Aws::Client::CoreErrors::VALIDATION, "FailInjection", "UploadPartFailIngection", false); + } +}; + +struct BaseSyncPolicy +{ + virtual ~BaseSyncPolicy() = default; + virtual DB::ThreadPoolCallbackRunner getScheduler() { return {}; } + virtual void execute(size_t = 0) {} + virtual void setAutoExecute(bool = true) {} + + virtual size_t size() const { return 0; } + virtual bool empty() const { return size() == 0; } +}; + +struct SimpleAsyncTasks : BaseSyncPolicy +{ + bool auto_execute = false; + std::deque> queue; + + virtual DB::ThreadPoolCallbackRunner getScheduler() override + { + return [this] (std::function && operation, size_t /*priority*/) + { + if (auto_execute) + { + auto task = std::packaged_task(std::move(operation)); + task(); + return task.get_future(); + } + + queue.emplace_back(std::move(operation)); + return queue.back().get_future(); + }; + } + + virtual void execute(size_t limit = 0) override + { + if (limit == 0) + limit = queue.size(); + + while (!queue.empty() && limit) + { + auto & request = queue.front(); + request(); + + queue.pop_front(); + --limit; + } + } + + virtual void setAutoExecute(bool value = true) override + { + auto_execute = value; + if (auto_execute) + execute(); + } + + virtual size_t size() const override { return queue.size(); } +}; + +} + +using namespace DB; + +void writeAsOneBlock(WriteBuffer& buf, size_t size) +{ + std::vector data(size, 'a'); + buf.write(data.data(), data.size()); +} + +void writeAsPieces(WriteBuffer& buf, size_t size) +{ + size_t ceil = 15ull*1024*1024*1024; + size_t piece = 1; + size_t written = 0; + while (written < size) { + size_t len = std::min({piece, size-written, ceil}); + writeAsOneBlock(buf, len); + written += len; + piece *= 2; + } +} + +class WBS3Test : public ::testing::Test +{ +public: + const String bucket = "WBS3Test-bucket"; + + Settings & getSettings() + { + return settings; + } + + MockS3::BaseSyncPolicy & getAsyncPolicy() + { + return *async_policy; + } + + std::unique_ptr getWriteBuffer(String file_name = "file") + { + S3Settings::RequestSettings request_settings; + request_settings.updateFromSettings(settings); + + client->resetCounters(); + + getAsyncPolicy().setAutoExecute(false); + + return std::make_unique( + client, + bucket, + file_name, + request_settings, + std::nullopt, + getAsyncPolicy().getScheduler()); + } + + void setInjectionModel(std::shared_ptr injections_) + { + client->setInjectionModel(injections_); + } + + void runSimpleScenario(MockS3::EventCounts expected_counters, size_t size) + { + auto scenario = [&] (std::function writeMethod) { + auto buffer = getWriteBuffer("file"); + writeMethod(*buffer, size); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + + expected_counters.writtenSize = size; + assertCountersEQ(expected_counters); + + auto & bStore = client->store->GetBucketStore(bucket); + auto & data = bStore.objects["file"]; + ASSERT_EQ(size, data.size()); + for (char c : data) + ASSERT_EQ('a', c); + }; + + scenario(writeAsOneBlock); + scenario(writeAsPieces); + } + + void assertCountersEQ(const MockS3::EventCounts & canonical) { + const auto & actual = client->counters; + ASSERT_EQ(canonical.headObject, actual.headObject); + ASSERT_EQ(canonical.getObject, actual.getObject); + ASSERT_EQ(canonical.putObject, actual.putObject); + ASSERT_EQ(canonical.multiUploadCreate, actual.multiUploadCreate); + ASSERT_EQ(canonical.multiUploadComplete, actual.multiUploadComplete); + ASSERT_EQ(canonical.multiUploadAbort, actual.multiUploadAbort); + ASSERT_EQ(canonical.uploadParts, actual.uploadParts); + ASSERT_EQ(canonical.writtenSize, actual.writtenSize); + } + + auto getCompletedPartUploads () + { + return client->store->GetBucketStore(bucket).GetCompletedPartUploads(); + } + +protected: + Settings settings; + + std::shared_ptr client; + std::unique_ptr async_policy; + + virtual void SetUp() override + { + client = MockS3::Client::CreateClient(bucket); + async_policy = std::make_unique(); + } + + virtual void TearDown() override + { + client.reset(); + async_policy.reset(); + } +}; + +class SyncAsync : public WBS3Test, public ::testing::WithParamInterface +{ +protected: + bool test_with_pool = false; + + virtual void SetUp() override + { + test_with_pool = GetParam(); + client = MockS3::Client::CreateClient(bucket); + if (test_with_pool) + async_policy = std::make_unique(); + else + async_policy = std::make_unique(); + } +}; + +INSTANTIATE_TEST_SUITE_P(WBS3 + , SyncAsync + , ::testing::Values(true, false) + , [] (const ::testing::TestParamInfo& info_param) { + std::string name = info_param.param ? "async" : "sync"; + return name; + }); + +TEST_P(SyncAsync, exception_on_head) { + setInjectionModel(std::make_shared()); + + getSettings().s3_check_objects_after_upload = true; + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_head_1"); + buffer->write('A'); + buffer->next(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch( const DB::Exception& e ) + { + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("Immediately after upload:")); + throw; + } + }, DB::S3Exception); +} + +TEST_P(SyncAsync, exception_on_put) { + setInjectionModel(std::make_shared()); + + EXPECT_THROW({ + try + { + auto buffer = getWriteBuffer("exception_on_put_1"); + buffer->write('A'); + buffer->next(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch( const DB::Exception& e ) + { + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("PutObjectFailIngection")); + throw; + } + }, DB::S3Exception); + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_put_2"); + buffer->write('A'); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch( const DB::Exception& e ) + { + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("PutObjectFailIngection")); + throw; + } + }, DB::S3Exception); + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_put_3"); + buffer->write('A'); + getAsyncPolicy().setAutoExecute(); + buffer->preFinalize(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch( const DB::Exception& e ) + { + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("PutObjectFailIngection")); + throw; + } + }, DB::S3Exception); + +} + +TEST_P(SyncAsync, exception_on_create_mpu) { + setInjectionModel(std::make_shared()); + + getSettings().s3_max_single_part_upload_size = 0; // no single part + getSettings().s3_min_upload_part_size = 1; // small parts ara ok + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_create_mpu_1"); + buffer->write('A'); + buffer->next(); + buffer->write('A'); + buffer->next(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch( const DB::Exception& e ) + { + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("CreateMPUFailIngection")); + throw; + } + }, DB::S3Exception); + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_create_mpu_2"); + buffer->write('A'); + buffer->preFinalize(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch( const DB::Exception& e ) + { + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("CreateMPUFailIngection")); + throw; + } + }, DB::S3Exception); + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_create_mpu_2"); + buffer->write('A'); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch( const DB::Exception& e ) + { + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("CreateMPUFailIngection")); + throw; + } + }, DB::S3Exception); +} + + +TEST_P(SyncAsync, exception_on_complete_mpu) { + setInjectionModel(std::make_shared()); + + getSettings().s3_max_single_part_upload_size = 0; // no single part + getSettings().s3_min_upload_part_size = 1; // small parts ara ok + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_complete_mpu_1"); + buffer->write('A'); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch(const DB::Exception & e) + { + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("CompleteMPUFailIngection")); + throw; + } + }, DB::S3Exception); +} + +TEST_P(SyncAsync, exception_on_upload_part) { + setInjectionModel(std::make_shared()); + + getSettings().s3_max_single_part_upload_size = 0; // no single part + getSettings().s3_min_upload_part_size = 1; // small parts ara ok + + MockS3::EventCounts counters = {.multiUploadCreate = 1, .multiUploadAbort = 1}; + + counters.uploadParts = 2; + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_upload_part_1"); + + buffer->write('A'); + buffer->next(); + buffer->write('A'); + buffer->next(); + + getAsyncPolicy().setAutoExecute(); + + buffer->finalize(); + } + catch(const DB::Exception & e) + { + assertCountersEQ(counters); + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("UploadPartFailIngection")); + throw; + } + }, DB::S3Exception); + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_upload_part_2"); + getAsyncPolicy().setAutoExecute(); + + buffer->write('A'); + buffer->next(); + + buffer->write('A'); + buffer->next(); + + buffer->finalize(); + } + catch(const DB::Exception & e) + { + assertCountersEQ(counters); + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("UploadPartFailIngection")); + throw; + } + }, DB::S3Exception); + + counters.uploadParts = 1; + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_upload_part_3"); + buffer->write('A'); + + buffer->preFinalize(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch(const DB::Exception & e) + { + assertCountersEQ(counters); + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("UploadPartFailIngection")); + throw; + } + }, DB::S3Exception); + + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("exception_on_upload_part_4"); + buffer->write('A'); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + } + catch(const DB::Exception & e) + { + assertCountersEQ(counters); + ASSERT_EQ(ErrorCodes::S3_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("UploadPartFailIngection")); + throw; + } + }, DB::S3Exception); +} + + +TEST_F(WBS3Test, prefinalize_called_multiple_times) { +#ifdef ABORT_ON_LOGICAL_ERROR + GTEST_SKIP() << "this test trigger LOGICAL_ERROR, runs only if ABORT_ON_LOGICAL_ERROR is not defined"; +#else + EXPECT_THROW({ + try { + auto buffer = getWriteBuffer("prefinalize_called_multiple_times"); + buffer->write('A'); + buffer->next(); + buffer->preFinalize(); + buffer->write('A'); + buffer->next(); + buffer->preFinalize(); + buffer->finalize(); + } + catch(const DB::Exception & e) + { + ASSERT_EQ(ErrorCodes::LOGICAL_ERROR, e.code()); + EXPECT_THAT(e.what(), testing::HasSubstr("write to prefinalized buffer for S3")); + throw; + } + }, DB::Exception); +#endif +} + +TEST_P(SyncAsync, empty_file) { + getSettings().s3_check_objects_after_upload = true; + + MockS3::EventCounts counters = {.headObject = 2, .putObject = 1}; + runSimpleScenario(counters, 0); +} + +TEST_P(SyncAsync, manual_next_calls) { + getSettings().s3_check_objects_after_upload = true; + + { + MockS3::EventCounts counters = {.headObject = 2, .putObject = 1}; + + auto buffer = getWriteBuffer("manual_next_calls_1"); + buffer->next(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + + assertCountersEQ(counters); + } + + { + MockS3::EventCounts counters = {.headObject = 2, .putObject = 1}; + + auto buffer = getWriteBuffer("manual_next_calls_2"); + buffer->next(); + buffer->next(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + + assertCountersEQ(counters); + } + + { + MockS3::EventCounts counters = {.headObject = 2, .putObject = 1, .writtenSize = 1}; + + auto buffer = getWriteBuffer("manual_next_calls_3"); + buffer->next(); + buffer->write('A'); + buffer->next(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + + assertCountersEQ(counters); + } + + { + MockS3::EventCounts counters = {.headObject = 2, .putObject = 1, .writtenSize = 2}; + + auto buffer = getWriteBuffer("manual_next_calls_4"); + buffer->write('A'); + buffer->next(); + buffer->write('A'); + buffer->next(); + buffer->next(); + + getAsyncPolicy().setAutoExecute(); + buffer->finalize(); + + assertCountersEQ(counters); + } +} + +TEST_P(SyncAsync, small_file_is_one_put_request) { + getSettings().s3_check_objects_after_upload = true; + + { + getSettings().s3_max_single_part_upload_size = 1000; + getSettings().s3_min_upload_part_size = 10; + + MockS3::EventCounts counters = {.headObject = 2, .putObject = 1}; + + runSimpleScenario(counters, 1); + runSimpleScenario(counters, getSettings().s3_max_single_part_upload_size-1); + runSimpleScenario(counters, getSettings().s3_max_single_part_upload_size); + runSimpleScenario(counters, getSettings().s3_max_single_part_upload_size/2); + } + + { + + getSettings().s3_max_single_part_upload_size = 10; + getSettings().s3_min_upload_part_size = 1000; + + MockS3::EventCounts counters = {.headObject = 2, .putObject = 1}; + + runSimpleScenario(counters, 1); + runSimpleScenario(counters, getSettings().s3_max_single_part_upload_size-1); + runSimpleScenario(counters, getSettings().s3_max_single_part_upload_size); + runSimpleScenario(counters, getSettings().s3_max_single_part_upload_size/2); + } +} + +TEST_P(SyncAsync, little_bigger_file_is_multi_part_upload) { + getSettings().s3_check_objects_after_upload = true; + + { + getSettings().s3_max_single_part_upload_size = 1000; + getSettings().s3_min_upload_part_size = 10; + + MockS3::EventCounts counters = {.headObject = 2, .multiUploadCreate = 1, .multiUploadComplete = 1, .uploadParts = 2}; + runSimpleScenario(counters, settings.s3_max_single_part_upload_size + 1); + + counters.uploadParts = 101; + runSimpleScenario(counters, 2*settings.s3_max_single_part_upload_size); + } + + { + getSettings().s3_max_single_part_upload_size = 10; + getSettings().s3_min_upload_part_size = 1000; + + MockS3::EventCounts counters = {.headObject = 2, .multiUploadCreate = 1, .multiUploadComplete = 1, .uploadParts = 1}; + + runSimpleScenario(counters, settings.s3_max_single_part_upload_size + 1); + runSimpleScenario(counters, 2*settings.s3_max_single_part_upload_size); + runSimpleScenario(counters, settings.s3_min_upload_part_size-1); + runSimpleScenario(counters, settings.s3_min_upload_part_size); + } +} + +TEST_P(SyncAsync, bigger_file_is_multi_part_upload) { + getSettings().s3_check_objects_after_upload = true; + + { + getSettings().s3_max_single_part_upload_size = 1000; + getSettings().s3_min_upload_part_size = 10; + + auto counters = MockS3::EventCounts{.headObject = 2, .multiUploadCreate = 1, .multiUploadComplete = 1, .uploadParts = 2}; + runSimpleScenario(counters, settings.s3_max_single_part_upload_size + settings.s3_min_upload_part_size); + + counters.uploadParts = 3; + runSimpleScenario(counters, settings.s3_max_single_part_upload_size + settings.s3_min_upload_part_size + 1); + runSimpleScenario(counters, settings.s3_max_single_part_upload_size + 2*settings.s3_min_upload_part_size - 1); + runSimpleScenario(counters, settings.s3_max_single_part_upload_size + 2*settings.s3_min_upload_part_size); + } + + + { + // but not in that case, when s3_min_upload_part_size > s3_max_single_part_upload_size + getSettings().s3_max_single_part_upload_size = 10; + getSettings().s3_min_upload_part_size = 1000; + + auto counters = MockS3::EventCounts{.headObject = 2, .multiUploadCreate = 1, .multiUploadComplete = 1, .uploadParts = 2}; + runSimpleScenario(counters, settings.s3_max_single_part_upload_size + settings.s3_min_upload_part_size); + runSimpleScenario(counters, settings.s3_max_single_part_upload_size + settings.s3_min_upload_part_size + 1); + runSimpleScenario(counters, 2*settings.s3_min_upload_part_size-1); + runSimpleScenario(counters, 2*settings.s3_min_upload_part_size); + + counters.uploadParts = 3; + runSimpleScenario(counters, 2*settings.s3_min_upload_part_size+1); + } +} + +TEST_P(SyncAsync, increase_upload_buffer) { + getSettings().s3_check_objects_after_upload = true; + + { + getSettings().s3_max_single_part_upload_size = 10; + getSettings().s3_min_upload_part_size = 10; + getSettings().s3_upload_part_size_multiply_parts_count_threshold = 1; + // parts: 10 20 40 80 160 + // size: 10 30 70 150 310 + + auto counters = MockS3::EventCounts{.headObject = 2, .multiUploadCreate = 1, .multiUploadComplete = 1, .uploadParts = 6}; + runSimpleScenario(counters, 350); + + auto actual_parts_sizes = MockS3::BucketMemStore::GetPartSizes(getCompletedPartUploads().back().second); + ASSERT_THAT(actual_parts_sizes, testing::ElementsAre(10, 20, 40, 80, 160, 40)); + } + + { + getSettings().s3_max_single_part_upload_size = 10; + getSettings().s3_min_upload_part_size = 10; + getSettings().s3_upload_part_size_multiply_parts_count_threshold = 2; + getSettings().s3_upload_part_size_multiply_factor = 3; + // parts: 10 10 30 30 90 + // size: 10 20 50 80 170 + + auto counters = MockS3::EventCounts{.headObject = 2, .multiUploadCreate = 1, .multiUploadComplete = 1, .uploadParts = 6}; + runSimpleScenario(counters, 190); + + auto actual_parts_sizes = MockS3::BucketMemStore::GetPartSizes(getCompletedPartUploads().back().second); + ASSERT_THAT(actual_parts_sizes, testing::ElementsAre(10, 10, 30, 30, 90, 20)); + } +} + +TEST_P(SyncAsync, increase_limited) { + getSettings().s3_check_objects_after_upload = true; + + { + getSettings().s3_max_single_part_upload_size = 10; + getSettings().s3_min_upload_part_size = 10; + getSettings().s3_upload_part_size_multiply_parts_count_threshold = 1; + getSettings().s3_max_upload_part_size = 45; + // parts: 10 20 40 45 45 45 + // size: 10 30 70 115 160 205 + + auto counters = MockS3::EventCounts{.headObject = 2, .multiUploadCreate = 1, .multiUploadComplete = 1, .uploadParts = 7}; + runSimpleScenario(counters, 220); + + auto actual_parts_sizes = MockS3::BucketMemStore::GetPartSizes(getCompletedPartUploads().back().second); + ASSERT_THAT(actual_parts_sizes, testing::ElementsAre(10, 20, 40, 45, 45, 45, 15)); + } +} + +#endif diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 9adfcf7fef7..f61c1dad59f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -290,6 +290,7 @@ void MergeTreeData::initializeDirectoriesAndFormatVersion(const std::string & re { auto buf = disk->writeFile(format_version_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, getContext()->getWriteSettings()); writeIntText(format_version.toUnderType(), *buf); + buf->finalize(); if (getContext()->getSettingsRef().fsync_metadata) buf->sync(); } diff --git a/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp b/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp index b843ce6a078..6c6a6ded5dd 100644 --- a/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp +++ b/src/Storages/MergeTree/MergeTreeDeduplicationLog.cpp @@ -160,7 +160,10 @@ void MergeTreeDeduplicationLog::rotate() existing_logs.emplace(current_log_number, log_description); if (current_writer) + { + current_writer->finalize(); current_writer->sync(); + } current_writer = disk->writeFile(log_description.path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append); } diff --git a/src/Storages/MergeTree/MergeTreeMutationEntry.cpp b/src/Storages/MergeTree/MergeTreeMutationEntry.cpp index 2e30a3f3986..feffffb57ea 100644 --- a/src/Storages/MergeTree/MergeTreeMutationEntry.cpp +++ b/src/Storages/MergeTree/MergeTreeMutationEntry.cpp @@ -75,6 +75,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP TransactionID::write(tid, *out); *out << "\n"; } + out->finalize(); out->sync(); } catch (...) diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 00e72482a17..a4d9dc9f2e3 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -777,7 +777,6 @@ public: key, configuration_.request_settings, std::nullopt, - DBMS_DEFAULT_BUFFER_SIZE, threadPoolCallbackRunner(IOThreadPool::get(), "S3ParallelWrite"), context->getWriteSettings()), compression_method, diff --git a/tests/queries/0_stateless/02240_filesystem_query_cache.reference b/tests/queries/0_stateless/02240_filesystem_query_cache.reference index f4b9f7bb127..16c4cd1c049 100644 --- a/tests/queries/0_stateless/02240_filesystem_query_cache.reference +++ b/tests/queries/0_stateless/02240_filesystem_query_cache.reference @@ -6,6 +6,7 @@ SET skip_download_if_exceeds_query_cache=1; SET filesystem_cache_max_download_size=128; DROP TABLE IF EXISTS test; CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_4', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false; +SYSTEM DROP FILESYSTEM CACHE; INSERT INTO test SELECT number, toString(number) FROM numbers(100); SELECT * FROM test FORMAT Null; SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size; diff --git a/tests/queries/0_stateless/02240_filesystem_query_cache.sql b/tests/queries/0_stateless/02240_filesystem_query_cache.sql index 94eb4bc5ccd..44856a2188c 100644 --- a/tests/queries/0_stateless/02240_filesystem_query_cache.sql +++ b/tests/queries/0_stateless/02240_filesystem_query_cache.sql @@ -9,8 +9,8 @@ SET filesystem_cache_max_download_size=128; DROP TABLE IF EXISTS test; CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_4', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false; +SYSTEM DROP FILESYSTEM CACHE; INSERT INTO test SELECT number, toString(number) FROM numbers(100); - SELECT * FROM test FORMAT Null; SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size; SYSTEM DROP FILESYSTEM CACHE; diff --git a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference index f3fac9b32d3..b3b7d12d219 100644 --- a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference +++ b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.reference @@ -1,6 +1,6 @@ Using storage policy: s3_cache 0 -0 +0 0 Row 1: ────── file_segment_range_begin: 0 @@ -8,11 +8,11 @@ file_segment_range_end: 745 size: 746 state: DOWNLOADED 8 -8 +8 1100 0 2 2 -8 +8 1100 Row 1: ────── file_segment_range_begin: 0 @@ -20,17 +20,17 @@ file_segment_range_end: 1659 size: 1660 state: DOWNLOADED 8 -8 -8 -8 -24 -35 -43 +8 2014 +8 2014 +8 2014 +24 84045 +35 168815 +44 252113 5010500 18816 Using storage policy: local_cache 0 -0 +0 0 Row 1: ────── file_segment_range_begin: 0 @@ -38,11 +38,11 @@ file_segment_range_end: 745 size: 746 state: DOWNLOADED 8 -8 +8 1100 0 2 2 -8 +8 1100 Row 1: ────── file_segment_range_begin: 0 @@ -50,11 +50,11 @@ file_segment_range_end: 1659 size: 1660 state: DOWNLOADED 8 -8 -8 -8 -24 -35 -43 +8 2014 +8 2014 +8 2014 +24 84045 +35 168815 +44 252113 5010500 18816 diff --git a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh index 048fb792e6e..e65bf9cb35f 100755 --- a/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh +++ b/tests/queries/0_stateless/02241_filesystem_cache_on_write_operations.sh @@ -33,7 +33,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do FORMAT Vertical" $CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)" @@ -54,7 +54,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do FORMAT Vertical" $CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0" @@ -64,7 +64,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do $CLICKHOUSE_CLIENT --query "SELECT * FROM test_02241 FORMAT Null" $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0" - $CLICKHOUSE_CLIENT --query "SELECT count() size FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) size FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE" @@ -87,24 +87,23 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do FORMAT Vertical;" $CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)" $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --query "SYSTEM START MERGES test_02241" $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "OPTIMIZE TABLE test_02241 FINAL" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache" + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --mutations_sync=2 --query "ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100" - $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache" - + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache" $CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)" $CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS" diff --git a/tests/queries/0_stateless/02382_filesystem_cache_persistent_files.reference b/tests/queries/0_stateless/02382_filesystem_cache_persistent_files.reference index 083f0f69dc8..e77afc98007 100644 --- a/tests/queries/0_stateless/02382_filesystem_cache_persistent_files.reference +++ b/tests/queries/0_stateless/02382_filesystem_cache_persistent_files.reference @@ -8,7 +8,7 @@ SYSTEM STOP MERGES nopers; INSERT INTO nopers SELECT number, toString(number) FROM numbers(10); SELECT * FROM nopers FORMAT Null; SELECT sum(size) FROM system.filesystem_cache; -194 +195 SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size FROM ( @@ -21,17 +21,18 @@ ON data_paths.cache_path = caches.cache_path ORDER BY file, cache, size; data.bin 0 114 data.mrk3 0 80 +format_version.txt 0 1 DROP TABLE IF EXISTS test; CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_small', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false; SYSTEM STOP MERGES test; INSERT INTO test SELECT number, toString(number) FROM numbers(100); SELECT * FROM test FORMAT Null; SELECT sum(size) FROM system.filesystem_cache; -1020 +1021 SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path; -4 +5 SELECT count() FROM system.filesystem_cache; -4 +5 SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size FROM ( @@ -46,17 +47,18 @@ data.bin 0 114 data.bin 0 746 data.mrk3 0 80 data.mrk3 0_persistent 80 +format_version.txt 0 1 DROP TABLE IF EXISTS test2; CREATE TABLE test2 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_small', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false; SYSTEM STOP MERGES test2; INSERT INTO test2 SELECT number, toString(number) FROM numbers(100000); SELECT * FROM test2 FORMAT Null; SELECT sum(size) FROM system.filesystem_cache; -794 +795 SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path; -4 +5 SELECT count() FROM system.filesystem_cache; -4 +5 SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size FROM ( @@ -71,6 +73,7 @@ data.bin 0 114 data.mrk3 0 80 data.mrk3 0_persistent 80 data.mrk3 0_persistent 520 +format_version.txt 0 1 DROP TABLE test; DROP TABLE test2; DROP TABLE nopers; diff --git a/tests/queries/0_stateless/02675_profile_events_from_query_log_and_client.reference b/tests/queries/0_stateless/02675_profile_events_from_query_log_and_client.reference index 00e93b1db3d..3f34d5e2c79 100644 --- a/tests/queries/0_stateless/02675_profile_events_from_query_log_and_client.reference +++ b/tests/queries/0_stateless/02675_profile_events_from_query_log_and_client.reference @@ -1,8 +1,8 @@ INSERT TO S3 [ 0 ] S3CompleteMultipartUpload: 1 [ 0 ] S3CreateMultipartUpload: 1 - [ 0 ] S3HeadObject: 1 - [ 0 ] S3ReadRequestsCount: 1 + [ 0 ] S3HeadObject: 2 + [ 0 ] S3ReadRequestsCount: 2 [ 0 ] S3UploadPart: 1 [ 0 ] S3WriteRequestsCount: 3 CHECK WITH query_log