Reduce memory usage of inserts to JSON by using adaptive write buffer size

This commit is contained in:
avogar 2024-09-04 17:12:17 +00:00
parent a7d0a5991e
commit 4b322ee3c5
20 changed files with 145 additions and 22 deletions

View File

@ -55,10 +55,29 @@ void CompressedWriteBuffer::nextImpl()
out.write(compressed_buffer.data(), compressed_size); out.write(compressed_buffer.data(), compressed_size);
} }
/// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer.
if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_buffer_max_size)
{
memory.resize(std::min(memory.size() * 2, adaptive_buffer_max_size));
BufferBase::set(memory.data(), memory.size(), 0);
}
} }
CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size) void CompressedWriteBuffer::finalizeImpl()
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(std::move(codec_)) {
/// Don't try to resize buffer in nextImpl.
use_adaptive_buffer_size = false;
next();
}
CompressedWriteBuffer::CompressedWriteBuffer(
WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size, bool use_adaptive_buffer_size_, size_t adaptive_buffer_initial_size)
: BufferWithOwnMemory<WriteBuffer>(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size)
, out(out_)
, codec(std::move(codec_))
, use_adaptive_buffer_size(use_adaptive_buffer_size_)
, adaptive_buffer_max_size(buf_size)
{ {
} }

View File

@ -19,7 +19,9 @@ public:
explicit CompressedWriteBuffer( explicit CompressedWriteBuffer(
WriteBuffer & out_, WriteBuffer & out_,
CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(), CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(),
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
~CompressedWriteBuffer() override; ~CompressedWriteBuffer() override;
@ -45,10 +47,17 @@ public:
private: private:
void nextImpl() override; void nextImpl() override;
void finalizeImpl() override;
WriteBuffer & out; WriteBuffer & out;
CompressionCodecPtr codec; CompressionCodecPtr codec;
/// If true, the size of internal buffer will be exponentially increased up to
/// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid
/// large buffer allocation when actual size of writen data is small.
bool use_adaptive_buffer_size;
size_t adaptive_buffer_max_size;
PODArray<char> compressed_buffer; PODArray<char> compressed_buffer;
}; };

View File

@ -20,6 +20,9 @@ static constexpr auto DBMS_DEFAULT_POLL_INTERVAL = 10;
/// The size of the I/O buffer by default. /// The size of the I/O buffer by default.
static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL; static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL;
/// The initial size of adaptive I/O buffer by default.
static constexpr auto DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE = 16384ULL;
static constexpr auto PADDING_FOR_SIMD = 64; static constexpr auto PADDING_FOR_SIMD = 64;
/** Which blocks by default read the data (by number of rows). /** Which blocks by default read the data (by number of rows).

View File

@ -420,6 +420,20 @@ bool ISerialization::isEphemeralSubcolumn(const DB::ISerialization::SubstreamPat
return path[last_elem].type == Substream::VariantElementNullMap; return path[last_elem].type == Substream::VariantElementNullMap;
} }
bool ISerialization::isDynamicSubcolumn(const DB::ISerialization::SubstreamPath & path, size_t prefix_len)
{
if (prefix_len == 0 || prefix_len > path.size())
return false;
for (size_t i = 0; i != prefix_len; ++i)
{
if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::ObjectData)
return true;
}
return false;
}
ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len) ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len)
{ {
assert(prefix_len <= path.size()); assert(prefix_len <= path.size());

View File

@ -457,6 +457,9 @@ public:
/// for writing/reading data. For example, it's a null-map subcolumn of Variant type (it's always constructed from discriminators);. /// for writing/reading data. For example, it's a null-map subcolumn of Variant type (it's always constructed from discriminators);.
static bool isEphemeralSubcolumn(const SubstreamPath & path, size_t prefix_len); static bool isEphemeralSubcolumn(const SubstreamPath & path, size_t prefix_len);
/// Returns true if stream with specified path corresponds to dynamic subcolumn.
static bool isDynamicSubcolumn(const SubstreamPath & path, size_t prefix_len);
protected: protected:
template <typename State, typename StatePtr> template <typename State, typename StatePtr>
State * checkAndGetState(const StatePtr & state) const; State * checkAndGetState(const StatePtr & state) const;

View File

@ -23,6 +23,8 @@
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/AdaptiveWriteBufferFromFile.h>
#include <IO/AdaptiveWriteBuffer.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -339,7 +341,15 @@ DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const
{ {
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
return std::make_unique<WriteBufferFromFile>( return std::make_unique<WriteBufferFromFile>(
fs::path(disk_path) / path, buf_size, flags, settings.local_throttler); fs::path(disk_path) / path,
buf_size,
flags,
settings.local_throttler,
0666,
nullptr,
0,
settings.use_adaptive_write_buffer,
settings.adaptive_write_buffer_initial_size);
} }
std::vector<String> DiskLocal::getBlobPath(const String & path) const std::vector<String> DiskLocal::getBlobPath(const String & path) const

View File

@ -251,12 +251,29 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer()
auto size = buffer_allocation_policy->getBufferSize(); auto size = buffer_allocation_policy->getBufferSize();
if (buffer_allocation_policy->getBufferNumber() == 1) if (buffer_allocation_policy->getBufferNumber() == 1)
size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size); {
allocateFirstBuffer();
return;
}
memory = Memory(size); memory = Memory(size);
WriteBuffer::set(memory.data(), memory.size()); WriteBuffer::set(memory.data(), memory.size());
} }
void WriteBufferFromAzureBlobStorage::allocateFirstBuffer()
{
/// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor.
/// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy.
/// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy.
/// Resize memory in this case to the desired size.
const auto max_first_buffer = buffer_allocation_policy->getBufferSize();
if (memory.size() > max_first_buffer)
{
memory.resize(max_first_buffer);
WriteBuffer::set(memory.data(), memory.size());
}
}
void WriteBufferFromAzureBlobStorage::detachBuffer() void WriteBufferFromAzureBlobStorage::detachBuffer()
{ {
size_t data_size = size_t(position() - memory.data()); size_t data_size = size_t(position() - memory.data());

View File

@ -53,6 +53,7 @@ private:
void detachBuffer(); void detachBuffer();
void reallocateFirstBuffer(); void reallocateFirstBuffer();
void allocateBuffer(); void allocateBuffer();
void allocateFirstBuffer();
void hidePartialData(); void hidePartialData();
void setFakeBufferWhenPreFinalized(); void setFakeBufferWhenPreFinalized();

View File

@ -282,7 +282,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
client.get(), client.get(),
uri.bucket, uri.bucket,
object.remote_path, object.remote_path,
buf_size, write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size,
request_settings, request_settings,
std::move(blob_storage_log), std::move(blob_storage_log),
attributes, attributes,

View File

@ -32,8 +32,10 @@ WriteBufferFromFile::WriteBufferFromFile(
ThrottlerPtr throttler_, ThrottlerPtr throttler_,
mode_t mode, mode_t mode,
char * existing_memory, char * existing_memory,
size_t alignment) size_t alignment,
: WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_) bool use_adaptive_buffer_size_,
size_t adaptive_buffer_initial_size)
: WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_, use_adaptive_buffer_size_, adaptive_buffer_initial_size)
{ {
ProfileEvents::increment(ProfileEvents::FileOpen); ProfileEvents::increment(ProfileEvents::FileOpen);
@ -66,8 +68,10 @@ WriteBufferFromFile::WriteBufferFromFile(
size_t buf_size, size_t buf_size,
ThrottlerPtr throttler_, ThrottlerPtr throttler_,
char * existing_memory, char * existing_memory,
size_t alignment) size_t alignment,
: WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name) bool use_adaptive_buffer_size_,
size_t adaptive_buffer_initial_size)
: WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name, use_adaptive_buffer_size_, adaptive_buffer_initial_size)
{ {
fd_ = -1; fd_ = -1;
} }

View File

@ -36,7 +36,9 @@ public:
ThrottlerPtr throttler_ = {}, ThrottlerPtr throttler_ = {},
mode_t mode = 0666, mode_t mode = 0666,
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0); size_t alignment = 0,
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
/// Use pre-opened file descriptor. /// Use pre-opened file descriptor.
explicit WriteBufferFromFile( explicit WriteBufferFromFile(
@ -45,7 +47,9 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
ThrottlerPtr throttler_ = {}, ThrottlerPtr throttler_ = {},
char * existing_memory = nullptr, char * existing_memory = nullptr,
size_t alignment = 0); size_t alignment = 0,
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
~WriteBufferFromFile() override; ~WriteBufferFromFile() override;

View File

@ -83,6 +83,13 @@ void WriteBufferFromFileDescriptor::nextImpl()
ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written);
/// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer.
if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_max_buffer_size)
{
memory.resize(std::min(memory.size() * 2, adaptive_max_buffer_size));
BufferBase::set(memory.data(), memory.size(), 0);
}
} }
/// NOTE: This class can be used as a very low-level building block, for example /// NOTE: This class can be used as a very low-level building block, for example
@ -94,11 +101,15 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
char * existing_memory, char * existing_memory,
ThrottlerPtr throttler_, ThrottlerPtr throttler_,
size_t alignment, size_t alignment,
std::string file_name_) std::string file_name_,
: WriteBufferFromFileBase(buf_size, existing_memory, alignment) bool use_adaptive_buffer_size_,
size_t adaptive_buffer_initial_size)
: WriteBufferFromFileBase(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size, existing_memory, alignment)
, fd(fd_) , fd(fd_)
, throttler(throttler_) , throttler(throttler_)
, file_name(std::move(file_name_)) , file_name(std::move(file_name_))
, use_adaptive_buffer_size(use_adaptive_buffer_size_)
, adaptive_max_buffer_size(buf_size)
{ {
} }
@ -124,6 +135,7 @@ void WriteBufferFromFileDescriptor::finalizeImpl()
return; return;
} }
use_adaptive_buffer_size = false;
next(); next();
} }

View File

@ -18,7 +18,9 @@ public:
char * existing_memory = nullptr, char * existing_memory = nullptr,
ThrottlerPtr throttler_ = {}, ThrottlerPtr throttler_ = {},
size_t alignment = 0, size_t alignment = 0,
std::string file_name_ = ""); std::string file_name_ = "",
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
/** Could be used before initialization if needed 'fd' was not passed to constructor. /** Could be used before initialization if needed 'fd' was not passed to constructor.
* It's not possible to change 'fd' during work. * It's not possible to change 'fd' during work.
@ -56,6 +58,12 @@ protected:
/// If file has name contains filename, otherwise contains string "(fd=...)" /// If file has name contains filename, otherwise contains string "(fd=...)"
std::string file_name; std::string file_name;
/// If true, the size of internal buffer will be exponentially increased up to
/// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid
/// large buffer allocation when actual size of writen data is small.
bool use_adaptive_buffer_size;
size_t adaptive_max_buffer_size;
void finalizeImpl() override; void finalizeImpl() override;
}; };

View File

@ -364,10 +364,16 @@ void WriteBufferFromS3::allocateBuffer()
void WriteBufferFromS3::allocateFirstBuffer() void WriteBufferFromS3::allocateFirstBuffer()
{ {
/// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size.
/// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy.
/// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy.
/// Resize memory in this case to the desired size.
const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); const auto max_first_buffer = buffer_allocation_policy->getBufferSize();
const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer); if (memory.size() > max_first_buffer)
memory = Memory(size); {
WriteBuffer::set(memory.data(), memory.size()); memory.resize(max_first_buffer);
WriteBuffer::set(memory.data(), memory.size());
}
} }
void WriteBufferFromS3::setFakeBufferWhenPreFinalized() void WriteBufferFromS3::setFakeBufferWhenPreFinalized()

View File

@ -25,6 +25,9 @@ struct WriteSettings
bool s3_allow_parallel_part_upload = true; bool s3_allow_parallel_part_upload = true;
bool azure_allow_parallel_part_upload = true; bool azure_allow_parallel_part_upload = true;
bool use_adaptive_write_buffer = false;
size_t adaptive_write_buffer_initial_size = 16 * 1024;
bool operator==(const WriteSettings & other) const = default; bool operator==(const WriteSettings & other) const = default;
}; };

View File

@ -85,11 +85,11 @@ MergeTreeDataPartWriterOnDisk::Stream<false>::Stream(
marks_file_extension{marks_file_extension_}, marks_file_extension{marks_file_extension_},
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
plain_hashing(*plain_file), plain_hashing(*plain_file),
compressor(plain_hashing, compression_codec_, max_compress_block_size_), compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
compressed_hashing(compressor), compressed_hashing(compressor),
marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)),
marks_hashing(*marks_file), marks_hashing(*marks_file),
marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_), marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
marks_compressed_hashing(marks_compressor), marks_compressed_hashing(marks_compressor),
compress_marks(MarkType(marks_file_extension).compressed) compress_marks(MarkType(marks_file_extension).compressed)
{ {
@ -108,7 +108,7 @@ MergeTreeDataPartWriterOnDisk::Stream<true>::Stream(
data_file_extension{data_file_extension_}, data_file_extension{data_file_extension_},
plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)),
plain_hashing(*plain_file), plain_hashing(*plain_file),
compressor(plain_hashing, compression_codec_, max_compress_block_size_), compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size),
compressed_hashing(compressor), compressed_hashing(compressor),
compress_marks(false) compress_marks(false)
{ {

View File

@ -177,6 +177,10 @@ void MergeTreeDataPartWriterWide::addStreams(
if (!max_compress_block_size) if (!max_compress_block_size)
max_compress_block_size = settings.max_compress_block_size; max_compress_block_size = settings.max_compress_block_size;
WriteSettings query_write_settings = settings.query_write_settings;
query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size());
query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size;
column_streams[stream_name] = std::make_unique<Stream<false>>( column_streams[stream_name] = std::make_unique<Stream<false>>(
stream_name, stream_name,
data_part_storage, data_part_storage,
@ -186,7 +190,7 @@ void MergeTreeDataPartWriterWide::addStreams(
max_compress_block_size, max_compress_block_size,
marks_compression_codec, marks_compression_codec,
settings.marks_compress_block_size, settings.marks_compress_block_size,
settings.query_write_settings); query_write_settings);
full_name_to_stream_name.emplace(full_stream_name, stream_name); full_name_to_stream_name.emplace(full_stream_name, stream_name);
stream_name_to_full_name.emplace(stream_name, full_stream_name); stream_name_to_full_name.emplace(stream_name, full_stream_name);

View File

@ -30,6 +30,8 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size) , low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size)
, low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0) , low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0)
, use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization) , use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization)
, use_adaptive_write_buffer_for_dynamic_subcolumns(storage_settings->use_adaptive_write_buffer_for_dynamic_subcolumns)
, adaptive_write_buffer_initial_size(storage_settings->adaptive_write_buffer_initial_size)
{ {
} }

View File

@ -80,6 +80,8 @@ struct MergeTreeWriterSettings
size_t low_cardinality_max_dictionary_size; size_t low_cardinality_max_dictionary_size;
bool low_cardinality_use_single_dictionary_for_part; bool low_cardinality_use_single_dictionary_for_part;
bool use_compact_variant_discriminators_serialization; bool use_compact_variant_discriminators_serialization;
bool use_adaptive_write_buffer_for_dynamic_subcolumns;
size_t adaptive_write_buffer_initial_size;
}; };
} }

View File

@ -99,6 +99,8 @@ struct Settings;
M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \ M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \ M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \ M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \
M(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \
M(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \
\ \
/* Part removal settings. */ \ /* Part removal settings. */ \
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \ M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \