From 4b322ee3c5e7a40f9bc3bc4fec8100f33dabf750 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 4 Sep 2024 17:12:17 +0000 Subject: [PATCH 1/6] Reduce memory usage of inserts to JSON by using adaptive write buffer size --- src/Compression/CompressedWriteBuffer.cpp | 23 +++++++++++++++++-- src/Compression/CompressedWriteBuffer.h | 11 ++++++++- src/Core/Defines.h | 3 +++ .../Serializations/ISerialization.cpp | 14 +++++++++++ src/DataTypes/Serializations/ISerialization.h | 3 +++ src/Disks/DiskLocal.cpp | 12 +++++++++- .../IO/WriteBufferFromAzureBlobStorage.cpp | 19 ++++++++++++++- .../IO/WriteBufferFromAzureBlobStorage.h | 1 + .../ObjectStorages/S3/S3ObjectStorage.cpp | 2 +- src/IO/WriteBufferFromFile.cpp | 12 ++++++---- src/IO/WriteBufferFromFile.h | 8 +++++-- src/IO/WriteBufferFromFileDescriptor.cpp | 16 +++++++++++-- src/IO/WriteBufferFromFileDescriptor.h | 10 +++++++- src/IO/WriteBufferFromS3.cpp | 12 +++++++--- src/IO/WriteSettings.h | 3 +++ .../MergeTreeDataPartWriterOnDisk.cpp | 6 ++--- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 6 ++++- .../MergeTree/MergeTreeIOSettings.cpp | 2 ++ src/Storages/MergeTree/MergeTreeIOSettings.h | 2 ++ src/Storages/MergeTree/MergeTreeSettings.h | 2 ++ 20 files changed, 145 insertions(+), 22 deletions(-) diff --git a/src/Compression/CompressedWriteBuffer.cpp b/src/Compression/CompressedWriteBuffer.cpp index 83c9fbc9573..c3acfcb7da6 100644 --- a/src/Compression/CompressedWriteBuffer.cpp +++ b/src/Compression/CompressedWriteBuffer.cpp @@ -55,10 +55,29 @@ void CompressedWriteBuffer::nextImpl() out.write(compressed_buffer.data(), compressed_size); } + + /// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer. + if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_buffer_max_size) + { + memory.resize(std::min(memory.size() * 2, adaptive_buffer_max_size)); + BufferBase::set(memory.data(), memory.size(), 0); + } } -CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size) - : BufferWithOwnMemory(buf_size), out(out_), codec(std::move(codec_)) +void CompressedWriteBuffer::finalizeImpl() +{ + /// Don't try to resize buffer in nextImpl. + use_adaptive_buffer_size = false; + next(); +} + +CompressedWriteBuffer::CompressedWriteBuffer( + WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size, bool use_adaptive_buffer_size_, size_t adaptive_buffer_initial_size) + : BufferWithOwnMemory(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size) + , out(out_) + , codec(std::move(codec_)) + , use_adaptive_buffer_size(use_adaptive_buffer_size_) + , adaptive_buffer_max_size(buf_size) { } diff --git a/src/Compression/CompressedWriteBuffer.h b/src/Compression/CompressedWriteBuffer.h index 6ae1fbee9cc..a3aae6b0c61 100644 --- a/src/Compression/CompressedWriteBuffer.h +++ b/src/Compression/CompressedWriteBuffer.h @@ -19,7 +19,9 @@ public: explicit CompressedWriteBuffer( WriteBuffer & out_, CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(), - size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); ~CompressedWriteBuffer() override; @@ -45,10 +47,17 @@ public: private: void nextImpl() override; + void finalizeImpl() override; WriteBuffer & out; CompressionCodecPtr codec; + /// If true, the size of internal buffer will be exponentially increased up to + /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid + /// large buffer allocation when actual size of writen data is small. + bool use_adaptive_buffer_size; + size_t adaptive_buffer_max_size; + PODArray compressed_buffer; }; diff --git a/src/Core/Defines.h b/src/Core/Defines.h index 6df335a9c8f..629ec58a936 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -20,6 +20,9 @@ static constexpr auto DBMS_DEFAULT_POLL_INTERVAL = 10; /// The size of the I/O buffer by default. static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL; +/// The initial size of adaptive I/O buffer by default. +static constexpr auto DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE = 16384ULL; + static constexpr auto PADDING_FOR_SIMD = 64; /** Which blocks by default read the data (by number of rows). diff --git a/src/DataTypes/Serializations/ISerialization.cpp b/src/DataTypes/Serializations/ISerialization.cpp index 338edc3a144..81efa921c9e 100644 --- a/src/DataTypes/Serializations/ISerialization.cpp +++ b/src/DataTypes/Serializations/ISerialization.cpp @@ -420,6 +420,20 @@ bool ISerialization::isEphemeralSubcolumn(const DB::ISerialization::SubstreamPat return path[last_elem].type == Substream::VariantElementNullMap; } +bool ISerialization::isDynamicSubcolumn(const DB::ISerialization::SubstreamPath & path, size_t prefix_len) +{ + if (prefix_len == 0 || prefix_len > path.size()) + return false; + + for (size_t i = 0; i != prefix_len; ++i) + { + if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::ObjectData) + return true; + } + + return false; +} + ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len) { assert(prefix_len <= path.size()); diff --git a/src/DataTypes/Serializations/ISerialization.h b/src/DataTypes/Serializations/ISerialization.h index 33575a07177..32f418e9132 100644 --- a/src/DataTypes/Serializations/ISerialization.h +++ b/src/DataTypes/Serializations/ISerialization.h @@ -457,6 +457,9 @@ public: /// for writing/reading data. For example, it's a null-map subcolumn of Variant type (it's always constructed from discriminators);. static bool isEphemeralSubcolumn(const SubstreamPath & path, size_t prefix_len); + /// Returns true if stream with specified path corresponds to dynamic subcolumn. + static bool isDynamicSubcolumn(const SubstreamPath & path, size_t prefix_len); + protected: template State * checkAndGetState(const StatePtr & state) const; diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index d1f0a928b1d..03b8140ce5c 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include @@ -339,7 +341,15 @@ DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const { int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; return std::make_unique( - fs::path(disk_path) / path, buf_size, flags, settings.local_throttler); + fs::path(disk_path) / path, + buf_size, + flags, + settings.local_throttler, + 0666, + nullptr, + 0, + settings.use_adaptive_write_buffer, + settings.adaptive_write_buffer_initial_size); } std::vector DiskLocal::getBlobPath(const String & path) const diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index 60fa2997c50..dcd625f7ee0 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -251,12 +251,29 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer() auto size = buffer_allocation_policy->getBufferSize(); if (buffer_allocation_policy->getBufferNumber() == 1) - size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size); + { + allocateFirstBuffer(); + return; + } memory = Memory(size); WriteBuffer::set(memory.data(), memory.size()); } +void WriteBufferFromAzureBlobStorage::allocateFirstBuffer() +{ + /// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor. + /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. + /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. + /// Resize memory in this case to the desired size. + const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); + if (memory.size() > max_first_buffer) + { + memory.resize(max_first_buffer); + WriteBuffer::set(memory.data(), memory.size()); + } +} + void WriteBufferFromAzureBlobStorage::detachBuffer() { size_t data_size = size_t(position() - memory.data()); diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h index 3ee497c4e44..c2d65928cfa 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h @@ -53,6 +53,7 @@ private: void detachBuffer(); void reallocateFirstBuffer(); void allocateBuffer(); + void allocateFirstBuffer(); void hidePartialData(); void setFakeBufferWhenPreFinalized(); diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 8de80971238..f26a3a8bd9d 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -282,7 +282,7 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN client.get(), uri.bucket, object.remote_path, - buf_size, + write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size, request_settings, std::move(blob_storage_log), attributes, diff --git a/src/IO/WriteBufferFromFile.cpp b/src/IO/WriteBufferFromFile.cpp index f1825ce1e22..d68203029c1 100644 --- a/src/IO/WriteBufferFromFile.cpp +++ b/src/IO/WriteBufferFromFile.cpp @@ -32,8 +32,10 @@ WriteBufferFromFile::WriteBufferFromFile( ThrottlerPtr throttler_, mode_t mode, char * existing_memory, - size_t alignment) - : WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_) + size_t alignment, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_, use_adaptive_buffer_size_, adaptive_buffer_initial_size) { ProfileEvents::increment(ProfileEvents::FileOpen); @@ -66,8 +68,10 @@ WriteBufferFromFile::WriteBufferFromFile( size_t buf_size, ThrottlerPtr throttler_, char * existing_memory, - size_t alignment) - : WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name) + size_t alignment, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name, use_adaptive_buffer_size_, adaptive_buffer_initial_size) { fd_ = -1; } diff --git a/src/IO/WriteBufferFromFile.h b/src/IO/WriteBufferFromFile.h index 57847d893af..c0fa7f0b233 100644 --- a/src/IO/WriteBufferFromFile.h +++ b/src/IO/WriteBufferFromFile.h @@ -36,7 +36,9 @@ public: ThrottlerPtr throttler_ = {}, mode_t mode = 0666, char * existing_memory = nullptr, - size_t alignment = 0); + size_t alignment = 0, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); /// Use pre-opened file descriptor. explicit WriteBufferFromFile( @@ -45,7 +47,9 @@ public: size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, ThrottlerPtr throttler_ = {}, char * existing_memory = nullptr, - size_t alignment = 0); + size_t alignment = 0, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); ~WriteBufferFromFile() override; diff --git a/src/IO/WriteBufferFromFileDescriptor.cpp b/src/IO/WriteBufferFromFileDescriptor.cpp index f1207edc55b..b60a792e11c 100644 --- a/src/IO/WriteBufferFromFileDescriptor.cpp +++ b/src/IO/WriteBufferFromFileDescriptor.cpp @@ -83,6 +83,13 @@ void WriteBufferFromFileDescriptor::nextImpl() ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); + + /// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer. + if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_max_buffer_size) + { + memory.resize(std::min(memory.size() * 2, adaptive_max_buffer_size)); + BufferBase::set(memory.data(), memory.size(), 0); + } } /// NOTE: This class can be used as a very low-level building block, for example @@ -94,11 +101,15 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor( char * existing_memory, ThrottlerPtr throttler_, size_t alignment, - std::string file_name_) - : WriteBufferFromFileBase(buf_size, existing_memory, alignment) + std::string file_name_, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileBase(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size, existing_memory, alignment) , fd(fd_) , throttler(throttler_) , file_name(std::move(file_name_)) + , use_adaptive_buffer_size(use_adaptive_buffer_size_) + , adaptive_max_buffer_size(buf_size) { } @@ -124,6 +135,7 @@ void WriteBufferFromFileDescriptor::finalizeImpl() return; } + use_adaptive_buffer_size = false; next(); } diff --git a/src/IO/WriteBufferFromFileDescriptor.h b/src/IO/WriteBufferFromFileDescriptor.h index cb73b1e1d08..1008c7cd8d9 100644 --- a/src/IO/WriteBufferFromFileDescriptor.h +++ b/src/IO/WriteBufferFromFileDescriptor.h @@ -18,7 +18,9 @@ public: char * existing_memory = nullptr, ThrottlerPtr throttler_ = {}, size_t alignment = 0, - std::string file_name_ = ""); + std::string file_name_ = "", + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); /** Could be used before initialization if needed 'fd' was not passed to constructor. * It's not possible to change 'fd' during work. @@ -56,6 +58,12 @@ protected: /// If file has name contains filename, otherwise contains string "(fd=...)" std::string file_name; + /// If true, the size of internal buffer will be exponentially increased up to + /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid + /// large buffer allocation when actual size of writen data is small. + bool use_adaptive_buffer_size; + size_t adaptive_max_buffer_size; + void finalizeImpl() override; }; diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index e702b4d35ad..d88e393e4e1 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -364,10 +364,16 @@ void WriteBufferFromS3::allocateBuffer() void WriteBufferFromS3::allocateFirstBuffer() { + /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. + /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. + /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. + /// Resize memory in this case to the desired size. const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); - const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer); - memory = Memory(size); - WriteBuffer::set(memory.data(), memory.size()); + if (memory.size() > max_first_buffer) + { + memory.resize(max_first_buffer); + WriteBuffer::set(memory.data(), memory.size()); + } } void WriteBufferFromS3::setFakeBufferWhenPreFinalized() diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index 84bb25439b5..ce78a3fd26b 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -25,6 +25,9 @@ struct WriteSettings bool s3_allow_parallel_part_upload = true; bool azure_allow_parallel_part_upload = true; + bool use_adaptive_write_buffer = false; + size_t adaptive_write_buffer_initial_size = 16 * 1024; + bool operator==(const WriteSettings & other) const = default; }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index b0e70e94b73..9bfc87135d9 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -85,11 +85,11 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( marks_file_extension{marks_file_extension_}, plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_hashing(*plain_file), - compressor(plain_hashing, compression_codec_, max_compress_block_size_), + compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), compressed_hashing(compressor), marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), marks_hashing(*marks_file), - marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_), + marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), marks_compressed_hashing(marks_compressor), compress_marks(MarkType(marks_file_extension).compressed) { @@ -108,7 +108,7 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( data_file_extension{data_file_extension_}, plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_hashing(*plain_file), - compressor(plain_hashing, compression_codec_, max_compress_block_size_), + compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), compressed_hashing(compressor), compress_marks(false) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 8b6735e0fe2..f050accd7a1 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -177,6 +177,10 @@ void MergeTreeDataPartWriterWide::addStreams( if (!max_compress_block_size) max_compress_block_size = settings.max_compress_block_size; + WriteSettings query_write_settings = settings.query_write_settings; + query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size()); + query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size; + column_streams[stream_name] = std::make_unique>( stream_name, data_part_storage, @@ -186,7 +190,7 @@ void MergeTreeDataPartWriterWide::addStreams( max_compress_block_size, marks_compression_codec, settings.marks_compress_block_size, - settings.query_write_settings); + query_write_settings); full_name_to_stream_name.emplace(full_stream_name, stream_name); stream_name_to_full_name.emplace(stream_name, full_stream_name); diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.cpp b/src/Storages/MergeTree/MergeTreeIOSettings.cpp index 24cb25afe47..19365a90a14 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeIOSettings.cpp @@ -30,6 +30,8 @@ MergeTreeWriterSettings::MergeTreeWriterSettings( , low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size) , low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0) , use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization) + , use_adaptive_write_buffer_for_dynamic_subcolumns(storage_settings->use_adaptive_write_buffer_for_dynamic_subcolumns) + , adaptive_write_buffer_initial_size(storage_settings->adaptive_write_buffer_initial_size) { } diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index 47b174b2e29..fcc72815d8f 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -80,6 +80,8 @@ struct MergeTreeWriterSettings size_t low_cardinality_max_dictionary_size; bool low_cardinality_use_single_dictionary_for_part; bool use_compact_variant_discriminators_serialization; + bool use_adaptive_write_buffer_for_dynamic_subcolumns; + size_t adaptive_write_buffer_initial_size; }; } diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 0769b60dc6b..dcb18155114 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -99,6 +99,8 @@ struct Settings; M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \ M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \ M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \ + M(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \ + M(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \ \ /* Part removal settings. */ \ M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \ From 68a8b5a3a1e70469ff47f81669a5b25d1d40fb96 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 4 Sep 2024 18:22:20 +0000 Subject: [PATCH 2/6] Better --- src/Disks/DiskLocal.cpp | 2 -- .../ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 03b8140ce5c..12a5b615234 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -23,8 +23,6 @@ #include #include #include -#include -#include #include diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index f85b5f45b37..fa48825e1a6 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -289,7 +289,7 @@ std::unique_ptr AzureObjectStorage::writeObject( /// NO return std::make_unique( client.get(), object.remote_path, - buf_size, + write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size, patchSettings(write_settings), settings.get(), std::move(scheduler)); From 0dad8b088a0452831c89b88b470b8df963fd741a Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 4 Sep 2024 18:35:10 +0000 Subject: [PATCH 3/6] Fix typo --- src/Compression/CompressedWriteBuffer.h | 2 +- src/IO/WriteBufferFromFileDescriptor.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Compression/CompressedWriteBuffer.h b/src/Compression/CompressedWriteBuffer.h index a3aae6b0c61..41596703bfe 100644 --- a/src/Compression/CompressedWriteBuffer.h +++ b/src/Compression/CompressedWriteBuffer.h @@ -54,7 +54,7 @@ private: /// If true, the size of internal buffer will be exponentially increased up to /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid - /// large buffer allocation when actual size of writen data is small. + /// large buffer allocation when actual size of written data is small. bool use_adaptive_buffer_size; size_t adaptive_buffer_max_size; diff --git a/src/IO/WriteBufferFromFileDescriptor.h b/src/IO/WriteBufferFromFileDescriptor.h index 1008c7cd8d9..e893ecd80fb 100644 --- a/src/IO/WriteBufferFromFileDescriptor.h +++ b/src/IO/WriteBufferFromFileDescriptor.h @@ -60,7 +60,7 @@ protected: /// If true, the size of internal buffer will be exponentially increased up to /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid - /// large buffer allocation when actual size of writen data is small. + /// large buffer allocation when actual size of written data is small. bool use_adaptive_buffer_size; size_t adaptive_max_buffer_size; From b7b88737ad996b43e27c22e697b9a947fff2a3c0 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 4 Sep 2024 18:37:33 +0000 Subject: [PATCH 4/6] Treat dynamic/object structure streams as dynamic --- src/DataTypes/Serializations/ISerialization.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/DataTypes/Serializations/ISerialization.cpp b/src/DataTypes/Serializations/ISerialization.cpp index 81efa921c9e..dcf637c7d2b 100644 --- a/src/DataTypes/Serializations/ISerialization.cpp +++ b/src/DataTypes/Serializations/ISerialization.cpp @@ -427,7 +427,8 @@ bool ISerialization::isDynamicSubcolumn(const DB::ISerialization::SubstreamPath for (size_t i = 0; i != prefix_len; ++i) { - if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::ObjectData) + if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::DynamicStructure + || path[i].type == SubstreamType::ObjectData || path[i].type == SubstreamType::ObjectStructure) return true; } From b4ef10ad1c075b5c16cafb911f67d4623cd40ceb Mon Sep 17 00:00:00 2001 From: avogar Date: Thu, 5 Sep 2024 08:52:22 +0000 Subject: [PATCH 5/6] Make better --- .../IO/WriteBufferFromAzureBlobStorage.cpp | 30 +++++-------------- .../IO/WriteBufferFromAzureBlobStorage.h | 1 - src/IO/WriteBufferFromS3.cpp | 21 ++----------- src/IO/WriteBufferFromS3.h | 1 - 4 files changed, 10 insertions(+), 43 deletions(-) diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index dcd625f7ee0..cbcfe0bdb97 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -59,7 +59,7 @@ WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage( const WriteSettings & write_settings_, std::shared_ptr settings_, ThreadPoolCallbackRunnerUnsafe schedule_) - : WriteBufferFromFileBase(buf_size_, nullptr, 0) + : WriteBufferFromFileBase(std::min(buf_size_, static_cast(DBMS_DEFAULT_BUFFER_SIZE)), nullptr, 0) , log(getLogger("WriteBufferFromAzureBlobStorage")) , buffer_allocation_policy(createBufferAllocationPolicy(*settings_)) , max_single_part_upload_size(settings_->max_single_part_upload_size) @@ -248,30 +248,14 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer() buffer_allocation_policy->nextBuffer(); chassert(0 == hidden_size); - auto size = buffer_allocation_policy->getBufferSize(); - - if (buffer_allocation_policy->getBufferNumber() == 1) - { - allocateFirstBuffer(); - return; - } - - memory = Memory(size); - WriteBuffer::set(memory.data(), memory.size()); -} - -void WriteBufferFromAzureBlobStorage::allocateFirstBuffer() -{ /// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor. /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. - /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. - /// Resize memory in this case to the desired size. - const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); - if (memory.size() > max_first_buffer) - { - memory.resize(max_first_buffer); - WriteBuffer::set(memory.data(), memory.size()); - } + if (buffer_allocation_policy->getBufferNumber() == 1) + return; + + auto size = buffer_allocation_policy->getBufferSize(); + memory = Memory(size); + WriteBuffer::set(memory.data(), memory.size()); } void WriteBufferFromAzureBlobStorage::detachBuffer() diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h index c2d65928cfa..3ee497c4e44 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h @@ -53,7 +53,6 @@ private: void detachBuffer(); void reallocateFirstBuffer(); void allocateBuffer(); - void allocateFirstBuffer(); void hidePartialData(); void setFakeBufferWhenPreFinalized(); diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index d88e393e4e1..424708cf2c8 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -96,7 +96,7 @@ WriteBufferFromS3::WriteBufferFromS3( std::optional> object_metadata_, ThreadPoolCallbackRunnerUnsafe schedule_, const WriteSettings & write_settings_) - : WriteBufferFromFileBase(buf_size_, nullptr, 0) + : WriteBufferFromFileBase(std::min(buf_size_, static_cast(DBMS_DEFAULT_BUFFER_SIZE)), nullptr, 0) , bucket(bucket_) , key(key_) , request_settings(request_settings_) @@ -352,30 +352,15 @@ void WriteBufferFromS3::allocateBuffer() buffer_allocation_policy->nextBuffer(); chassert(0 == hidden_size); + /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. + /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. if (buffer_allocation_policy->getBufferNumber() == 1) - { - allocateFirstBuffer(); return; - } memory = Memory(buffer_allocation_policy->getBufferSize()); WriteBuffer::set(memory.data(), memory.size()); } -void WriteBufferFromS3::allocateFirstBuffer() -{ - /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. - /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. - /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. - /// Resize memory in this case to the desired size. - const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); - if (memory.size() > max_first_buffer) - { - memory.resize(max_first_buffer); - WriteBuffer::set(memory.data(), memory.size()); - } -} - void WriteBufferFromS3::setFakeBufferWhenPreFinalized() { WriteBuffer::set(fake_buffer_when_prefinalized, sizeof(fake_buffer_when_prefinalized)); diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index b026da607c5..604f036fcb8 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -64,7 +64,6 @@ private: void reallocateFirstBuffer(); void detachBuffer(); void allocateBuffer(); - void allocateFirstBuffer(); void setFakeBufferWhenPreFinalized(); S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data); From 72dbc8205bbc1711a39d127590dfa12b3d05c96a Mon Sep 17 00:00:00 2001 From: avogar Date: Mon, 9 Sep 2024 15:12:17 +0000 Subject: [PATCH 6/6] Fix unit tests --- src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp | 9 +++++++++ src/IO/WriteBufferFromS3.cpp | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index cbcfe0bdb97..229b36a05f6 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -251,7 +251,16 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer() /// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor. /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. if (buffer_allocation_policy->getBufferNumber() == 1) + { + /// Reduce memory size if initial size was larger then desired size from buffer_allocation_policy. + /// Usually it doesn't happen but we have it in unit tests. + if (memory.size() > buffer_allocation_policy->getBufferSize()) + { + memory.resize(buffer_allocation_policy->getBufferSize()); + WriteBuffer::set(memory.data(), memory.size()); + } return; + } auto size = buffer_allocation_policy->getBufferSize(); memory = Memory(size); diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 424708cf2c8..3a4ab3ee882 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -355,7 +355,16 @@ void WriteBufferFromS3::allocateBuffer() /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. if (buffer_allocation_policy->getBufferNumber() == 1) + { + /// Reduce memory size if initial size was larger then desired size from buffer_allocation_policy. + /// Usually it doesn't happen but we have it in unit tests. + if (memory.size() > buffer_allocation_policy->getBufferSize()) + { + memory.resize(buffer_allocation_policy->getBufferSize()); + WriteBuffer::set(memory.data(), memory.size()); + } return; + } memory = Memory(buffer_allocation_policy->getBufferSize()); WriteBuffer::set(memory.data(), memory.size());