diff --git a/src/Compression/CompressedWriteBuffer.cpp b/src/Compression/CompressedWriteBuffer.cpp index 83c9fbc9573..c3acfcb7da6 100644 --- a/src/Compression/CompressedWriteBuffer.cpp +++ b/src/Compression/CompressedWriteBuffer.cpp @@ -55,10 +55,29 @@ void CompressedWriteBuffer::nextImpl() out.write(compressed_buffer.data(), compressed_size); } + + /// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer. + if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_buffer_max_size) + { + memory.resize(std::min(memory.size() * 2, adaptive_buffer_max_size)); + BufferBase::set(memory.data(), memory.size(), 0); + } } -CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size) - : BufferWithOwnMemory(buf_size), out(out_), codec(std::move(codec_)) +void CompressedWriteBuffer::finalizeImpl() +{ + /// Don't try to resize buffer in nextImpl. + use_adaptive_buffer_size = false; + next(); +} + +CompressedWriteBuffer::CompressedWriteBuffer( + WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size, bool use_adaptive_buffer_size_, size_t adaptive_buffer_initial_size) + : BufferWithOwnMemory(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size) + , out(out_) + , codec(std::move(codec_)) + , use_adaptive_buffer_size(use_adaptive_buffer_size_) + , adaptive_buffer_max_size(buf_size) { } diff --git a/src/Compression/CompressedWriteBuffer.h b/src/Compression/CompressedWriteBuffer.h index 6ae1fbee9cc..a3aae6b0c61 100644 --- a/src/Compression/CompressedWriteBuffer.h +++ b/src/Compression/CompressedWriteBuffer.h @@ -19,7 +19,9 @@ public: explicit CompressedWriteBuffer( WriteBuffer & out_, CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(), - size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); ~CompressedWriteBuffer() override; @@ -45,10 +47,17 @@ public: private: void nextImpl() override; + void finalizeImpl() override; WriteBuffer & out; CompressionCodecPtr codec; + /// If true, the size of internal buffer will be exponentially increased up to + /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid + /// large buffer allocation when actual size of writen data is small. + bool use_adaptive_buffer_size; + size_t adaptive_buffer_max_size; + PODArray compressed_buffer; }; diff --git a/src/Core/Defines.h b/src/Core/Defines.h index 6df335a9c8f..629ec58a936 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -20,6 +20,9 @@ static constexpr auto DBMS_DEFAULT_POLL_INTERVAL = 10; /// The size of the I/O buffer by default. static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL; +/// The initial size of adaptive I/O buffer by default. +static constexpr auto DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE = 16384ULL; + static constexpr auto PADDING_FOR_SIMD = 64; /** Which blocks by default read the data (by number of rows). diff --git a/src/DataTypes/Serializations/ISerialization.cpp b/src/DataTypes/Serializations/ISerialization.cpp index 338edc3a144..81efa921c9e 100644 --- a/src/DataTypes/Serializations/ISerialization.cpp +++ b/src/DataTypes/Serializations/ISerialization.cpp @@ -420,6 +420,20 @@ bool ISerialization::isEphemeralSubcolumn(const DB::ISerialization::SubstreamPat return path[last_elem].type == Substream::VariantElementNullMap; } +bool ISerialization::isDynamicSubcolumn(const DB::ISerialization::SubstreamPath & path, size_t prefix_len) +{ + if (prefix_len == 0 || prefix_len > path.size()) + return false; + + for (size_t i = 0; i != prefix_len; ++i) + { + if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::ObjectData) + return true; + } + + return false; +} + ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len) { assert(prefix_len <= path.size()); diff --git a/src/DataTypes/Serializations/ISerialization.h b/src/DataTypes/Serializations/ISerialization.h index 33575a07177..32f418e9132 100644 --- a/src/DataTypes/Serializations/ISerialization.h +++ b/src/DataTypes/Serializations/ISerialization.h @@ -457,6 +457,9 @@ public: /// for writing/reading data. For example, it's a null-map subcolumn of Variant type (it's always constructed from discriminators);. static bool isEphemeralSubcolumn(const SubstreamPath & path, size_t prefix_len); + /// Returns true if stream with specified path corresponds to dynamic subcolumn. + static bool isDynamicSubcolumn(const SubstreamPath & path, size_t prefix_len); + protected: template State * checkAndGetState(const StatePtr & state) const; diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index d1f0a928b1d..03b8140ce5c 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include @@ -339,7 +341,15 @@ DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const { int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; return std::make_unique( - fs::path(disk_path) / path, buf_size, flags, settings.local_throttler); + fs::path(disk_path) / path, + buf_size, + flags, + settings.local_throttler, + 0666, + nullptr, + 0, + settings.use_adaptive_write_buffer, + settings.adaptive_write_buffer_initial_size); } std::vector DiskLocal::getBlobPath(const String & path) const diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index 60fa2997c50..dcd625f7ee0 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -251,12 +251,29 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer() auto size = buffer_allocation_policy->getBufferSize(); if (buffer_allocation_policy->getBufferNumber() == 1) - size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size); + { + allocateFirstBuffer(); + return; + } memory = Memory(size); WriteBuffer::set(memory.data(), memory.size()); } +void WriteBufferFromAzureBlobStorage::allocateFirstBuffer() +{ + /// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor. + /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. + /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. + /// Resize memory in this case to the desired size. + const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); + if (memory.size() > max_first_buffer) + { + memory.resize(max_first_buffer); + WriteBuffer::set(memory.data(), memory.size()); + } +} + void WriteBufferFromAzureBlobStorage::detachBuffer() { size_t data_size = size_t(position() - memory.data()); diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h index 3ee497c4e44..c2d65928cfa 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h @@ -53,6 +53,7 @@ private: void detachBuffer(); void reallocateFirstBuffer(); void allocateBuffer(); + void allocateFirstBuffer(); void hidePartialData(); void setFakeBufferWhenPreFinalized(); diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 8de80971238..f26a3a8bd9d 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -282,7 +282,7 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN client.get(), uri.bucket, object.remote_path, - buf_size, + write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size, request_settings, std::move(blob_storage_log), attributes, diff --git a/src/IO/WriteBufferFromFile.cpp b/src/IO/WriteBufferFromFile.cpp index f1825ce1e22..d68203029c1 100644 --- a/src/IO/WriteBufferFromFile.cpp +++ b/src/IO/WriteBufferFromFile.cpp @@ -32,8 +32,10 @@ WriteBufferFromFile::WriteBufferFromFile( ThrottlerPtr throttler_, mode_t mode, char * existing_memory, - size_t alignment) - : WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_) + size_t alignment, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_, use_adaptive_buffer_size_, adaptive_buffer_initial_size) { ProfileEvents::increment(ProfileEvents::FileOpen); @@ -66,8 +68,10 @@ WriteBufferFromFile::WriteBufferFromFile( size_t buf_size, ThrottlerPtr throttler_, char * existing_memory, - size_t alignment) - : WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name) + size_t alignment, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name, use_adaptive_buffer_size_, adaptive_buffer_initial_size) { fd_ = -1; } diff --git a/src/IO/WriteBufferFromFile.h b/src/IO/WriteBufferFromFile.h index 57847d893af..c0fa7f0b233 100644 --- a/src/IO/WriteBufferFromFile.h +++ b/src/IO/WriteBufferFromFile.h @@ -36,7 +36,9 @@ public: ThrottlerPtr throttler_ = {}, mode_t mode = 0666, char * existing_memory = nullptr, - size_t alignment = 0); + size_t alignment = 0, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); /// Use pre-opened file descriptor. explicit WriteBufferFromFile( @@ -45,7 +47,9 @@ public: size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, ThrottlerPtr throttler_ = {}, char * existing_memory = nullptr, - size_t alignment = 0); + size_t alignment = 0, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); ~WriteBufferFromFile() override; diff --git a/src/IO/WriteBufferFromFileDescriptor.cpp b/src/IO/WriteBufferFromFileDescriptor.cpp index f1207edc55b..b60a792e11c 100644 --- a/src/IO/WriteBufferFromFileDescriptor.cpp +++ b/src/IO/WriteBufferFromFileDescriptor.cpp @@ -83,6 +83,13 @@ void WriteBufferFromFileDescriptor::nextImpl() ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); + + /// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer. + if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_max_buffer_size) + { + memory.resize(std::min(memory.size() * 2, adaptive_max_buffer_size)); + BufferBase::set(memory.data(), memory.size(), 0); + } } /// NOTE: This class can be used as a very low-level building block, for example @@ -94,11 +101,15 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor( char * existing_memory, ThrottlerPtr throttler_, size_t alignment, - std::string file_name_) - : WriteBufferFromFileBase(buf_size, existing_memory, alignment) + std::string file_name_, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileBase(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size, existing_memory, alignment) , fd(fd_) , throttler(throttler_) , file_name(std::move(file_name_)) + , use_adaptive_buffer_size(use_adaptive_buffer_size_) + , adaptive_max_buffer_size(buf_size) { } @@ -124,6 +135,7 @@ void WriteBufferFromFileDescriptor::finalizeImpl() return; } + use_adaptive_buffer_size = false; next(); } diff --git a/src/IO/WriteBufferFromFileDescriptor.h b/src/IO/WriteBufferFromFileDescriptor.h index cb73b1e1d08..1008c7cd8d9 100644 --- a/src/IO/WriteBufferFromFileDescriptor.h +++ b/src/IO/WriteBufferFromFileDescriptor.h @@ -18,7 +18,9 @@ public: char * existing_memory = nullptr, ThrottlerPtr throttler_ = {}, size_t alignment = 0, - std::string file_name_ = ""); + std::string file_name_ = "", + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); /** Could be used before initialization if needed 'fd' was not passed to constructor. * It's not possible to change 'fd' during work. @@ -56,6 +58,12 @@ protected: /// If file has name contains filename, otherwise contains string "(fd=...)" std::string file_name; + /// If true, the size of internal buffer will be exponentially increased up to + /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid + /// large buffer allocation when actual size of writen data is small. + bool use_adaptive_buffer_size; + size_t adaptive_max_buffer_size; + void finalizeImpl() override; }; diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index e702b4d35ad..d88e393e4e1 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -364,10 +364,16 @@ void WriteBufferFromS3::allocateBuffer() void WriteBufferFromS3::allocateFirstBuffer() { + /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. + /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. + /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. + /// Resize memory in this case to the desired size. const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); - const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer); - memory = Memory(size); - WriteBuffer::set(memory.data(), memory.size()); + if (memory.size() > max_first_buffer) + { + memory.resize(max_first_buffer); + WriteBuffer::set(memory.data(), memory.size()); + } } void WriteBufferFromS3::setFakeBufferWhenPreFinalized() diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index 84bb25439b5..ce78a3fd26b 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -25,6 +25,9 @@ struct WriteSettings bool s3_allow_parallel_part_upload = true; bool azure_allow_parallel_part_upload = true; + bool use_adaptive_write_buffer = false; + size_t adaptive_write_buffer_initial_size = 16 * 1024; + bool operator==(const WriteSettings & other) const = default; }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index b0e70e94b73..9bfc87135d9 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -85,11 +85,11 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( marks_file_extension{marks_file_extension_}, plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_hashing(*plain_file), - compressor(plain_hashing, compression_codec_, max_compress_block_size_), + compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), compressed_hashing(compressor), marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), marks_hashing(*marks_file), - marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_), + marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), marks_compressed_hashing(marks_compressor), compress_marks(MarkType(marks_file_extension).compressed) { @@ -108,7 +108,7 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( data_file_extension{data_file_extension_}, plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_hashing(*plain_file), - compressor(plain_hashing, compression_codec_, max_compress_block_size_), + compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), compressed_hashing(compressor), compress_marks(false) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 8b6735e0fe2..f050accd7a1 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -177,6 +177,10 @@ void MergeTreeDataPartWriterWide::addStreams( if (!max_compress_block_size) max_compress_block_size = settings.max_compress_block_size; + WriteSettings query_write_settings = settings.query_write_settings; + query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size()); + query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size; + column_streams[stream_name] = std::make_unique>( stream_name, data_part_storage, @@ -186,7 +190,7 @@ void MergeTreeDataPartWriterWide::addStreams( max_compress_block_size, marks_compression_codec, settings.marks_compress_block_size, - settings.query_write_settings); + query_write_settings); full_name_to_stream_name.emplace(full_stream_name, stream_name); stream_name_to_full_name.emplace(stream_name, full_stream_name); diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.cpp b/src/Storages/MergeTree/MergeTreeIOSettings.cpp index 24cb25afe47..19365a90a14 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeIOSettings.cpp @@ -30,6 +30,8 @@ MergeTreeWriterSettings::MergeTreeWriterSettings( , low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size) , low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0) , use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization) + , use_adaptive_write_buffer_for_dynamic_subcolumns(storage_settings->use_adaptive_write_buffer_for_dynamic_subcolumns) + , adaptive_write_buffer_initial_size(storage_settings->adaptive_write_buffer_initial_size) { } diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index 47b174b2e29..fcc72815d8f 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -80,6 +80,8 @@ struct MergeTreeWriterSettings size_t low_cardinality_max_dictionary_size; bool low_cardinality_use_single_dictionary_for_part; bool use_compact_variant_discriminators_serialization; + bool use_adaptive_write_buffer_for_dynamic_subcolumns; + size_t adaptive_write_buffer_initial_size; }; } diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 0769b60dc6b..dcb18155114 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -99,6 +99,8 @@ struct Settings; M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \ M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \ M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \ + M(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \ + M(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \ \ /* Part removal settings. */ \ M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \