From 704ec8dea6b4fa583689508e4362644517321005 Mon Sep 17 00:00:00 2001 From: Matt Woenker Date: Mon, 5 Aug 2024 14:55:33 -0400 Subject: [PATCH 01/24] Handle incomplete sequences at end of input --- src/IO/WriteBufferValidUTF8.cpp | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/IO/WriteBufferValidUTF8.cpp b/src/IO/WriteBufferValidUTF8.cpp index d611befac37..426f302cb02 100644 --- a/src/IO/WriteBufferValidUTF8.cpp +++ b/src/IO/WriteBufferValidUTF8.cpp @@ -149,9 +149,27 @@ void WriteBufferValidUTF8::finalizeImpl() /// Write all complete sequences from buffer. nextImpl(); - /// If unfinished sequence at end, then write replacement. - if (working_buffer.begin() != memory.data()) - putReplacement(); + /// Handle remaining bytes if we have an incomplete sequence + if (working_buffer.begin() != memory.data()) { + char * p = memory.data(); + + while (p < pos) { + UInt8 len = length_of_utf8_sequence[static_cast(*p)]; + if (p + len > pos) { + // Incomplete sequence. Skip one byte. + putReplacement(); + ++p; + } else if (Poco::UTF8Encoding::isLegal(reinterpret_cast(p), len)) { + // Valid sequence + putValid(p, len); + p += len; + } else { + // Invalid sequence, skip first byte. + putReplacement(); + ++p; + } + } + } } } From 2a1ee419b473350323c5582bcb950ab5630a07e2 Mon Sep 17 00:00:00 2001 From: Matt Woenker Date: Tue, 6 Aug 2024 15:53:49 -0400 Subject: [PATCH 02/24] Fix style issue --- src/IO/WriteBufferValidUTF8.cpp | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/IO/WriteBufferValidUTF8.cpp b/src/IO/WriteBufferValidUTF8.cpp index 426f302cb02..8441b4eafa2 100644 --- a/src/IO/WriteBufferValidUTF8.cpp +++ b/src/IO/WriteBufferValidUTF8.cpp @@ -150,20 +150,27 @@ void WriteBufferValidUTF8::finalizeImpl() nextImpl(); /// Handle remaining bytes if we have an incomplete sequence - if (working_buffer.begin() != memory.data()) { + if (working_buffer.begin() != memory.data()) + { char * p = memory.data(); - while (p < pos) { + while (p < pos) + { UInt8 len = length_of_utf8_sequence[static_cast(*p)]; - if (p + len > pos) { + if (p + len > pos) + { // Incomplete sequence. Skip one byte. putReplacement(); ++p; - } else if (Poco::UTF8Encoding::isLegal(reinterpret_cast(p), len)) { + } + else if (Poco::UTF8Encoding::isLegal(reinterpret_cast(p), len)) + { // Valid sequence putValid(p, len); p += len; - } else { + } + else + { // Invalid sequence, skip first byte. putReplacement(); ++p; From f4ed3f5c6d138dc6ac7209393c60562ddf9e8d24 Mon Sep 17 00:00:00 2001 From: Matt Woenker Date: Tue, 6 Aug 2024 16:14:26 -0400 Subject: [PATCH 03/24] Add test --- .../03221_incomplete-utf8-sequence.reference | 16 ++++++++++++++++ .../03221_incomplete-utf8-sequence.sql | 2 ++ 2 files changed, 18 insertions(+) create mode 100644 tests/queries/0_stateless/03221_incomplete-utf8-sequence.reference create mode 100644 tests/queries/0_stateless/03221_incomplete-utf8-sequence.sql diff --git a/tests/queries/0_stateless/03221_incomplete-utf8-sequence.reference b/tests/queries/0_stateless/03221_incomplete-utf8-sequence.reference new file mode 100644 index 00000000000..4577427251d --- /dev/null +++ b/tests/queries/0_stateless/03221_incomplete-utf8-sequence.reference @@ -0,0 +1,16 @@ +{ + "meta": + [ + { + "name": "unhex('f0')", + "type": "String" + } + ], + + "data": + [ + ["�"] + ], + + "rows": 1 +} diff --git a/tests/queries/0_stateless/03221_incomplete-utf8-sequence.sql b/tests/queries/0_stateless/03221_incomplete-utf8-sequence.sql new file mode 100644 index 00000000000..ee4f25f3b4a --- /dev/null +++ b/tests/queries/0_stateless/03221_incomplete-utf8-sequence.sql @@ -0,0 +1,2 @@ +SET output_format_write_statistics = 0; +SELECT unhex('f0') FORMAT JSONCompact; From cff7cbac94b1b6f39dd4585fca5bcbcb2f952747 Mon Sep 17 00:00:00 2001 From: Matt Woenker Date: Wed, 7 Aug 2024 16:21:16 -0400 Subject: [PATCH 04/24] Style fix --- src/IO/WriteBufferValidUTF8.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/IO/WriteBufferValidUTF8.cpp b/src/IO/WriteBufferValidUTF8.cpp index 8441b4eafa2..25c7b3d4820 100644 --- a/src/IO/WriteBufferValidUTF8.cpp +++ b/src/IO/WriteBufferValidUTF8.cpp @@ -159,19 +159,19 @@ void WriteBufferValidUTF8::finalizeImpl() UInt8 len = length_of_utf8_sequence[static_cast(*p)]; if (p + len > pos) { - // Incomplete sequence. Skip one byte. + /// Incomplete sequence. Skip one byte. putReplacement(); ++p; } else if (Poco::UTF8Encoding::isLegal(reinterpret_cast(p), len)) { - // Valid sequence + /// Valid sequence putValid(p, len); p += len; } else { - // Invalid sequence, skip first byte. + /// Invalid sequence, skip first byte. putReplacement(); ++p; } From d09531e48a90db637a3e5b42a2fd9569c911ed44 Mon Sep 17 00:00:00 2001 From: Matt Woenker Date: Wed, 7 Aug 2024 16:25:01 -0400 Subject: [PATCH 05/24] Rename test files --- ...equence.reference => 03221_incomplete_utf8_sequence.reference} | 0 ...plete-utf8-sequence.sql => 03221_incomplete_utf8_sequence.sql} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/queries/0_stateless/{03221_incomplete-utf8-sequence.reference => 03221_incomplete_utf8_sequence.reference} (100%) rename tests/queries/0_stateless/{03221_incomplete-utf8-sequence.sql => 03221_incomplete_utf8_sequence.sql} (100%) diff --git a/tests/queries/0_stateless/03221_incomplete-utf8-sequence.reference b/tests/queries/0_stateless/03221_incomplete_utf8_sequence.reference similarity index 100% rename from tests/queries/0_stateless/03221_incomplete-utf8-sequence.reference rename to tests/queries/0_stateless/03221_incomplete_utf8_sequence.reference diff --git a/tests/queries/0_stateless/03221_incomplete-utf8-sequence.sql b/tests/queries/0_stateless/03221_incomplete_utf8_sequence.sql similarity index 100% rename from tests/queries/0_stateless/03221_incomplete-utf8-sequence.sql rename to tests/queries/0_stateless/03221_incomplete_utf8_sequence.sql From f8a14e86d8db56e8ad7046b8848c7521d235a777 Mon Sep 17 00:00:00 2001 From: Matt Woenker Date: Wed, 7 Aug 2024 16:32:13 -0400 Subject: [PATCH 06/24] Add const a few places --- src/IO/WriteBufferValidUTF8.cpp | 8 ++++---- src/IO/WriteBufferValidUTF8.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/IO/WriteBufferValidUTF8.cpp b/src/IO/WriteBufferValidUTF8.cpp index 25c7b3d4820..2a86f8c2801 100644 --- a/src/IO/WriteBufferValidUTF8.cpp +++ b/src/IO/WriteBufferValidUTF8.cpp @@ -54,7 +54,7 @@ inline void WriteBufferValidUTF8::putReplacement() } -inline void WriteBufferValidUTF8::putValid(char *data, size_t len) +inline void WriteBufferValidUTF8::putValid(const char *data, size_t len) { if (len == 0) return; @@ -152,18 +152,18 @@ void WriteBufferValidUTF8::finalizeImpl() /// Handle remaining bytes if we have an incomplete sequence if (working_buffer.begin() != memory.data()) { - char * p = memory.data(); + const char * p = memory.data(); while (p < pos) { - UInt8 len = length_of_utf8_sequence[static_cast(*p)]; + UInt8 len = length_of_utf8_sequence[static_cast(*p)]; if (p + len > pos) { /// Incomplete sequence. Skip one byte. putReplacement(); ++p; } - else if (Poco::UTF8Encoding::isLegal(reinterpret_cast(p), len)) + else if (Poco::UTF8Encoding::isLegal(reinterpret_cast(p), len)) { /// Valid sequence putValid(p, len); diff --git a/src/IO/WriteBufferValidUTF8.h b/src/IO/WriteBufferValidUTF8.h index daaf0427f88..a398b8ded01 100644 --- a/src/IO/WriteBufferValidUTF8.h +++ b/src/IO/WriteBufferValidUTF8.h @@ -26,7 +26,7 @@ public: private: void putReplacement(); - void putValid(char * data, size_t len); + void putValid(const char * data, size_t len); void nextImpl() override; void finalizeImpl() override; From 4b322ee3c5e7a40f9bc3bc4fec8100f33dabf750 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 4 Sep 2024 17:12:17 +0000 Subject: [PATCH 07/24] Reduce memory usage of inserts to JSON by using adaptive write buffer size --- src/Compression/CompressedWriteBuffer.cpp | 23 +++++++++++++++++-- src/Compression/CompressedWriteBuffer.h | 11 ++++++++- src/Core/Defines.h | 3 +++ .../Serializations/ISerialization.cpp | 14 +++++++++++ src/DataTypes/Serializations/ISerialization.h | 3 +++ src/Disks/DiskLocal.cpp | 12 +++++++++- .../IO/WriteBufferFromAzureBlobStorage.cpp | 19 ++++++++++++++- .../IO/WriteBufferFromAzureBlobStorage.h | 1 + .../ObjectStorages/S3/S3ObjectStorage.cpp | 2 +- src/IO/WriteBufferFromFile.cpp | 12 ++++++---- src/IO/WriteBufferFromFile.h | 8 +++++-- src/IO/WriteBufferFromFileDescriptor.cpp | 16 +++++++++++-- src/IO/WriteBufferFromFileDescriptor.h | 10 +++++++- src/IO/WriteBufferFromS3.cpp | 12 +++++++--- src/IO/WriteSettings.h | 3 +++ .../MergeTreeDataPartWriterOnDisk.cpp | 6 ++--- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 6 ++++- .../MergeTree/MergeTreeIOSettings.cpp | 2 ++ src/Storages/MergeTree/MergeTreeIOSettings.h | 2 ++ src/Storages/MergeTree/MergeTreeSettings.h | 2 ++ 20 files changed, 145 insertions(+), 22 deletions(-) diff --git a/src/Compression/CompressedWriteBuffer.cpp b/src/Compression/CompressedWriteBuffer.cpp index 83c9fbc9573..c3acfcb7da6 100644 --- a/src/Compression/CompressedWriteBuffer.cpp +++ b/src/Compression/CompressedWriteBuffer.cpp @@ -55,10 +55,29 @@ void CompressedWriteBuffer::nextImpl() out.write(compressed_buffer.data(), compressed_size); } + + /// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer. + if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_buffer_max_size) + { + memory.resize(std::min(memory.size() * 2, adaptive_buffer_max_size)); + BufferBase::set(memory.data(), memory.size(), 0); + } } -CompressedWriteBuffer::CompressedWriteBuffer(WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size) - : BufferWithOwnMemory(buf_size), out(out_), codec(std::move(codec_)) +void CompressedWriteBuffer::finalizeImpl() +{ + /// Don't try to resize buffer in nextImpl. + use_adaptive_buffer_size = false; + next(); +} + +CompressedWriteBuffer::CompressedWriteBuffer( + WriteBuffer & out_, CompressionCodecPtr codec_, size_t buf_size, bool use_adaptive_buffer_size_, size_t adaptive_buffer_initial_size) + : BufferWithOwnMemory(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size) + , out(out_) + , codec(std::move(codec_)) + , use_adaptive_buffer_size(use_adaptive_buffer_size_) + , adaptive_buffer_max_size(buf_size) { } diff --git a/src/Compression/CompressedWriteBuffer.h b/src/Compression/CompressedWriteBuffer.h index 6ae1fbee9cc..a3aae6b0c61 100644 --- a/src/Compression/CompressedWriteBuffer.h +++ b/src/Compression/CompressedWriteBuffer.h @@ -19,7 +19,9 @@ public: explicit CompressedWriteBuffer( WriteBuffer & out_, CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(), - size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); ~CompressedWriteBuffer() override; @@ -45,10 +47,17 @@ public: private: void nextImpl() override; + void finalizeImpl() override; WriteBuffer & out; CompressionCodecPtr codec; + /// If true, the size of internal buffer will be exponentially increased up to + /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid + /// large buffer allocation when actual size of writen data is small. + bool use_adaptive_buffer_size; + size_t adaptive_buffer_max_size; + PODArray compressed_buffer; }; diff --git a/src/Core/Defines.h b/src/Core/Defines.h index 6df335a9c8f..629ec58a936 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -20,6 +20,9 @@ static constexpr auto DBMS_DEFAULT_POLL_INTERVAL = 10; /// The size of the I/O buffer by default. static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL; +/// The initial size of adaptive I/O buffer by default. +static constexpr auto DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE = 16384ULL; + static constexpr auto PADDING_FOR_SIMD = 64; /** Which blocks by default read the data (by number of rows). diff --git a/src/DataTypes/Serializations/ISerialization.cpp b/src/DataTypes/Serializations/ISerialization.cpp index 338edc3a144..81efa921c9e 100644 --- a/src/DataTypes/Serializations/ISerialization.cpp +++ b/src/DataTypes/Serializations/ISerialization.cpp @@ -420,6 +420,20 @@ bool ISerialization::isEphemeralSubcolumn(const DB::ISerialization::SubstreamPat return path[last_elem].type == Substream::VariantElementNullMap; } +bool ISerialization::isDynamicSubcolumn(const DB::ISerialization::SubstreamPath & path, size_t prefix_len) +{ + if (prefix_len == 0 || prefix_len > path.size()) + return false; + + for (size_t i = 0; i != prefix_len; ++i) + { + if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::ObjectData) + return true; + } + + return false; +} + ISerialization::SubstreamData ISerialization::createFromPath(const SubstreamPath & path, size_t prefix_len) { assert(prefix_len <= path.size()); diff --git a/src/DataTypes/Serializations/ISerialization.h b/src/DataTypes/Serializations/ISerialization.h index 33575a07177..32f418e9132 100644 --- a/src/DataTypes/Serializations/ISerialization.h +++ b/src/DataTypes/Serializations/ISerialization.h @@ -457,6 +457,9 @@ public: /// for writing/reading data. For example, it's a null-map subcolumn of Variant type (it's always constructed from discriminators);. static bool isEphemeralSubcolumn(const SubstreamPath & path, size_t prefix_len); + /// Returns true if stream with specified path corresponds to dynamic subcolumn. + static bool isDynamicSubcolumn(const SubstreamPath & path, size_t prefix_len); + protected: template State * checkAndGetState(const StatePtr & state) const; diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index d1f0a928b1d..03b8140ce5c 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include @@ -339,7 +341,15 @@ DiskLocal::writeFile(const String & path, size_t buf_size, WriteMode mode, const { int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1; return std::make_unique( - fs::path(disk_path) / path, buf_size, flags, settings.local_throttler); + fs::path(disk_path) / path, + buf_size, + flags, + settings.local_throttler, + 0666, + nullptr, + 0, + settings.use_adaptive_write_buffer, + settings.adaptive_write_buffer_initial_size); } std::vector DiskLocal::getBlobPath(const String & path) const diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index 60fa2997c50..dcd625f7ee0 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -251,12 +251,29 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer() auto size = buffer_allocation_policy->getBufferSize(); if (buffer_allocation_policy->getBufferNumber() == 1) - size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size); + { + allocateFirstBuffer(); + return; + } memory = Memory(size); WriteBuffer::set(memory.data(), memory.size()); } +void WriteBufferFromAzureBlobStorage::allocateFirstBuffer() +{ + /// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor. + /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. + /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. + /// Resize memory in this case to the desired size. + const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); + if (memory.size() > max_first_buffer) + { + memory.resize(max_first_buffer); + WriteBuffer::set(memory.data(), memory.size()); + } +} + void WriteBufferFromAzureBlobStorage::detachBuffer() { size_t data_size = size_t(position() - memory.data()); diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h index 3ee497c4e44..c2d65928cfa 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h @@ -53,6 +53,7 @@ private: void detachBuffer(); void reallocateFirstBuffer(); void allocateBuffer(); + void allocateFirstBuffer(); void hidePartialData(); void setFakeBufferWhenPreFinalized(); diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 8de80971238..f26a3a8bd9d 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -282,7 +282,7 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN client.get(), uri.bucket, object.remote_path, - buf_size, + write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size, request_settings, std::move(blob_storage_log), attributes, diff --git a/src/IO/WriteBufferFromFile.cpp b/src/IO/WriteBufferFromFile.cpp index f1825ce1e22..d68203029c1 100644 --- a/src/IO/WriteBufferFromFile.cpp +++ b/src/IO/WriteBufferFromFile.cpp @@ -32,8 +32,10 @@ WriteBufferFromFile::WriteBufferFromFile( ThrottlerPtr throttler_, mode_t mode, char * existing_memory, - size_t alignment) - : WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_) + size_t alignment, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, throttler_, alignment, file_name_, use_adaptive_buffer_size_, adaptive_buffer_initial_size) { ProfileEvents::increment(ProfileEvents::FileOpen); @@ -66,8 +68,10 @@ WriteBufferFromFile::WriteBufferFromFile( size_t buf_size, ThrottlerPtr throttler_, char * existing_memory, - size_t alignment) - : WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name) + size_t alignment, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileDescriptor(fd_, buf_size, existing_memory, throttler_, alignment, original_file_name, use_adaptive_buffer_size_, adaptive_buffer_initial_size) { fd_ = -1; } diff --git a/src/IO/WriteBufferFromFile.h b/src/IO/WriteBufferFromFile.h index 57847d893af..c0fa7f0b233 100644 --- a/src/IO/WriteBufferFromFile.h +++ b/src/IO/WriteBufferFromFile.h @@ -36,7 +36,9 @@ public: ThrottlerPtr throttler_ = {}, mode_t mode = 0666, char * existing_memory = nullptr, - size_t alignment = 0); + size_t alignment = 0, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); /// Use pre-opened file descriptor. explicit WriteBufferFromFile( @@ -45,7 +47,9 @@ public: size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, ThrottlerPtr throttler_ = {}, char * existing_memory = nullptr, - size_t alignment = 0); + size_t alignment = 0, + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); ~WriteBufferFromFile() override; diff --git a/src/IO/WriteBufferFromFileDescriptor.cpp b/src/IO/WriteBufferFromFileDescriptor.cpp index f1207edc55b..b60a792e11c 100644 --- a/src/IO/WriteBufferFromFileDescriptor.cpp +++ b/src/IO/WriteBufferFromFileDescriptor.cpp @@ -83,6 +83,13 @@ void WriteBufferFromFileDescriptor::nextImpl() ProfileEvents::increment(ProfileEvents::DiskWriteElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::WriteBufferFromFileDescriptorWriteBytes, bytes_written); + + /// Increase buffer size for next data if adaptive buffer size is used and nextImpl was called because of end of buffer. + if (!available() && use_adaptive_buffer_size && memory.size() < adaptive_max_buffer_size) + { + memory.resize(std::min(memory.size() * 2, adaptive_max_buffer_size)); + BufferBase::set(memory.data(), memory.size(), 0); + } } /// NOTE: This class can be used as a very low-level building block, for example @@ -94,11 +101,15 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor( char * existing_memory, ThrottlerPtr throttler_, size_t alignment, - std::string file_name_) - : WriteBufferFromFileBase(buf_size, existing_memory, alignment) + std::string file_name_, + bool use_adaptive_buffer_size_, + size_t adaptive_buffer_initial_size) + : WriteBufferFromFileBase(use_adaptive_buffer_size_ ? adaptive_buffer_initial_size : buf_size, existing_memory, alignment) , fd(fd_) , throttler(throttler_) , file_name(std::move(file_name_)) + , use_adaptive_buffer_size(use_adaptive_buffer_size_) + , adaptive_max_buffer_size(buf_size) { } @@ -124,6 +135,7 @@ void WriteBufferFromFileDescriptor::finalizeImpl() return; } + use_adaptive_buffer_size = false; next(); } diff --git a/src/IO/WriteBufferFromFileDescriptor.h b/src/IO/WriteBufferFromFileDescriptor.h index cb73b1e1d08..1008c7cd8d9 100644 --- a/src/IO/WriteBufferFromFileDescriptor.h +++ b/src/IO/WriteBufferFromFileDescriptor.h @@ -18,7 +18,9 @@ public: char * existing_memory = nullptr, ThrottlerPtr throttler_ = {}, size_t alignment = 0, - std::string file_name_ = ""); + std::string file_name_ = "", + bool use_adaptive_buffer_size_ = false, + size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE); /** Could be used before initialization if needed 'fd' was not passed to constructor. * It's not possible to change 'fd' during work. @@ -56,6 +58,12 @@ protected: /// If file has name contains filename, otherwise contains string "(fd=...)" std::string file_name; + /// If true, the size of internal buffer will be exponentially increased up to + /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid + /// large buffer allocation when actual size of writen data is small. + bool use_adaptive_buffer_size; + size_t adaptive_max_buffer_size; + void finalizeImpl() override; }; diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index e702b4d35ad..d88e393e4e1 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -364,10 +364,16 @@ void WriteBufferFromS3::allocateBuffer() void WriteBufferFromS3::allocateFirstBuffer() { + /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. + /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. + /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. + /// Resize memory in this case to the desired size. const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); - const auto size = std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), max_first_buffer); - memory = Memory(size); - WriteBuffer::set(memory.data(), memory.size()); + if (memory.size() > max_first_buffer) + { + memory.resize(max_first_buffer); + WriteBuffer::set(memory.data(), memory.size()); + } } void WriteBufferFromS3::setFakeBufferWhenPreFinalized() diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index 84bb25439b5..ce78a3fd26b 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -25,6 +25,9 @@ struct WriteSettings bool s3_allow_parallel_part_upload = true; bool azure_allow_parallel_part_upload = true; + bool use_adaptive_write_buffer = false; + size_t adaptive_write_buffer_initial_size = 16 * 1024; + bool operator==(const WriteSettings & other) const = default; }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index b0e70e94b73..9bfc87135d9 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -85,11 +85,11 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( marks_file_extension{marks_file_extension_}, plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_hashing(*plain_file), - compressor(plain_hashing, compression_codec_, max_compress_block_size_), + compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), compressed_hashing(compressor), marks_file(data_part_storage->writeFile(marks_path_ + marks_file_extension, 4096, query_write_settings)), marks_hashing(*marks_file), - marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_), + marks_compressor(marks_hashing, marks_compression_codec_, marks_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), marks_compressed_hashing(marks_compressor), compress_marks(MarkType(marks_file_extension).compressed) { @@ -108,7 +108,7 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( data_file_extension{data_file_extension_}, plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), plain_hashing(*plain_file), - compressor(plain_hashing, compression_codec_, max_compress_block_size_), + compressor(plain_hashing, compression_codec_, max_compress_block_size_, query_write_settings.use_adaptive_write_buffer, query_write_settings.adaptive_write_buffer_initial_size), compressed_hashing(compressor), compress_marks(false) { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 8b6735e0fe2..f050accd7a1 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -177,6 +177,10 @@ void MergeTreeDataPartWriterWide::addStreams( if (!max_compress_block_size) max_compress_block_size = settings.max_compress_block_size; + WriteSettings query_write_settings = settings.query_write_settings; + query_write_settings.use_adaptive_write_buffer = settings.use_adaptive_write_buffer_for_dynamic_subcolumns && ISerialization::isDynamicSubcolumn(substream_path, substream_path.size()); + query_write_settings.adaptive_write_buffer_initial_size = settings.adaptive_write_buffer_initial_size; + column_streams[stream_name] = std::make_unique>( stream_name, data_part_storage, @@ -186,7 +190,7 @@ void MergeTreeDataPartWriterWide::addStreams( max_compress_block_size, marks_compression_codec, settings.marks_compress_block_size, - settings.query_write_settings); + query_write_settings); full_name_to_stream_name.emplace(full_stream_name, stream_name); stream_name_to_full_name.emplace(stream_name, full_stream_name); diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.cpp b/src/Storages/MergeTree/MergeTreeIOSettings.cpp index 24cb25afe47..19365a90a14 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeIOSettings.cpp @@ -30,6 +30,8 @@ MergeTreeWriterSettings::MergeTreeWriterSettings( , low_cardinality_max_dictionary_size(global_settings.low_cardinality_max_dictionary_size) , low_cardinality_use_single_dictionary_for_part(global_settings.low_cardinality_use_single_dictionary_for_part != 0) , use_compact_variant_discriminators_serialization(storage_settings->use_compact_variant_discriminators_serialization) + , use_adaptive_write_buffer_for_dynamic_subcolumns(storage_settings->use_adaptive_write_buffer_for_dynamic_subcolumns) + , adaptive_write_buffer_initial_size(storage_settings->adaptive_write_buffer_initial_size) { } diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index 47b174b2e29..fcc72815d8f 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -80,6 +80,8 @@ struct MergeTreeWriterSettings size_t low_cardinality_max_dictionary_size; bool low_cardinality_use_single_dictionary_for_part; bool use_compact_variant_discriminators_serialization; + bool use_adaptive_write_buffer_for_dynamic_subcolumns; + size_t adaptive_write_buffer_initial_size; }; } diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 0769b60dc6b..dcb18155114 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -99,6 +99,8 @@ struct Settings; M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \ M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \ M(Bool, optimize_row_order, false, "Allow reshuffling of rows during part inserts and merges to improve the compressibility of the new part", 0) \ + M(Bool, use_adaptive_write_buffer_for_dynamic_subcolumns, true, "Allow to use adaptive writer buffers during writing dynamic subcolumns to reduce memory usage", 0) \ + M(UInt64, adaptive_write_buffer_initial_size, 16 * 1024, "Initial size of an adaptive write buffer", 0) \ \ /* Part removal settings. */ \ M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \ From 68a8b5a3a1e70469ff47f81669a5b25d1d40fb96 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 4 Sep 2024 18:22:20 +0000 Subject: [PATCH 08/24] Better --- src/Disks/DiskLocal.cpp | 2 -- .../ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 03b8140ce5c..12a5b615234 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -23,8 +23,6 @@ #include #include #include -#include -#include #include diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index f85b5f45b37..fa48825e1a6 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -289,7 +289,7 @@ std::unique_ptr AzureObjectStorage::writeObject( /// NO return std::make_unique( client.get(), object.remote_path, - buf_size, + write_settings.use_adaptive_write_buffer ? write_settings.adaptive_write_buffer_initial_size : buf_size, patchSettings(write_settings), settings.get(), std::move(scheduler)); From 0dad8b088a0452831c89b88b470b8df963fd741a Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 4 Sep 2024 18:35:10 +0000 Subject: [PATCH 09/24] Fix typo --- src/Compression/CompressedWriteBuffer.h | 2 +- src/IO/WriteBufferFromFileDescriptor.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Compression/CompressedWriteBuffer.h b/src/Compression/CompressedWriteBuffer.h index a3aae6b0c61..41596703bfe 100644 --- a/src/Compression/CompressedWriteBuffer.h +++ b/src/Compression/CompressedWriteBuffer.h @@ -54,7 +54,7 @@ private: /// If true, the size of internal buffer will be exponentially increased up to /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid - /// large buffer allocation when actual size of writen data is small. + /// large buffer allocation when actual size of written data is small. bool use_adaptive_buffer_size; size_t adaptive_buffer_max_size; diff --git a/src/IO/WriteBufferFromFileDescriptor.h b/src/IO/WriteBufferFromFileDescriptor.h index 1008c7cd8d9..e893ecd80fb 100644 --- a/src/IO/WriteBufferFromFileDescriptor.h +++ b/src/IO/WriteBufferFromFileDescriptor.h @@ -60,7 +60,7 @@ protected: /// If true, the size of internal buffer will be exponentially increased up to /// adaptive_buffer_max_size after each nextImpl call. It can be used to avoid - /// large buffer allocation when actual size of writen data is small. + /// large buffer allocation when actual size of written data is small. bool use_adaptive_buffer_size; size_t adaptive_max_buffer_size; From b7b88737ad996b43e27c22e697b9a947fff2a3c0 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 4 Sep 2024 18:37:33 +0000 Subject: [PATCH 10/24] Treat dynamic/object structure streams as dynamic --- src/DataTypes/Serializations/ISerialization.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/DataTypes/Serializations/ISerialization.cpp b/src/DataTypes/Serializations/ISerialization.cpp index 81efa921c9e..dcf637c7d2b 100644 --- a/src/DataTypes/Serializations/ISerialization.cpp +++ b/src/DataTypes/Serializations/ISerialization.cpp @@ -427,7 +427,8 @@ bool ISerialization::isDynamicSubcolumn(const DB::ISerialization::SubstreamPath for (size_t i = 0; i != prefix_len; ++i) { - if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::ObjectData) + if (path[i].type == SubstreamType::DynamicData || path[i].type == SubstreamType::DynamicStructure + || path[i].type == SubstreamType::ObjectData || path[i].type == SubstreamType::ObjectStructure) return true; } From b4ef10ad1c075b5c16cafb911f67d4623cd40ceb Mon Sep 17 00:00:00 2001 From: avogar Date: Thu, 5 Sep 2024 08:52:22 +0000 Subject: [PATCH 11/24] Make better --- .../IO/WriteBufferFromAzureBlobStorage.cpp | 30 +++++-------------- .../IO/WriteBufferFromAzureBlobStorage.h | 1 - src/IO/WriteBufferFromS3.cpp | 21 ++----------- src/IO/WriteBufferFromS3.h | 1 - 4 files changed, 10 insertions(+), 43 deletions(-) diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index dcd625f7ee0..cbcfe0bdb97 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -59,7 +59,7 @@ WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage( const WriteSettings & write_settings_, std::shared_ptr settings_, ThreadPoolCallbackRunnerUnsafe schedule_) - : WriteBufferFromFileBase(buf_size_, nullptr, 0) + : WriteBufferFromFileBase(std::min(buf_size_, static_cast(DBMS_DEFAULT_BUFFER_SIZE)), nullptr, 0) , log(getLogger("WriteBufferFromAzureBlobStorage")) , buffer_allocation_policy(createBufferAllocationPolicy(*settings_)) , max_single_part_upload_size(settings_->max_single_part_upload_size) @@ -248,30 +248,14 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer() buffer_allocation_policy->nextBuffer(); chassert(0 == hidden_size); - auto size = buffer_allocation_policy->getBufferSize(); - - if (buffer_allocation_policy->getBufferNumber() == 1) - { - allocateFirstBuffer(); - return; - } - - memory = Memory(size); - WriteBuffer::set(memory.data(), memory.size()); -} - -void WriteBufferFromAzureBlobStorage::allocateFirstBuffer() -{ /// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor. /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. - /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. - /// Resize memory in this case to the desired size. - const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); - if (memory.size() > max_first_buffer) - { - memory.resize(max_first_buffer); - WriteBuffer::set(memory.data(), memory.size()); - } + if (buffer_allocation_policy->getBufferNumber() == 1) + return; + + auto size = buffer_allocation_policy->getBufferSize(); + memory = Memory(size); + WriteBuffer::set(memory.data(), memory.size()); } void WriteBufferFromAzureBlobStorage::detachBuffer() diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h index c2d65928cfa..3ee497c4e44 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.h +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.h @@ -53,7 +53,6 @@ private: void detachBuffer(); void reallocateFirstBuffer(); void allocateBuffer(); - void allocateFirstBuffer(); void hidePartialData(); void setFakeBufferWhenPreFinalized(); diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index d88e393e4e1..424708cf2c8 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -96,7 +96,7 @@ WriteBufferFromS3::WriteBufferFromS3( std::optional> object_metadata_, ThreadPoolCallbackRunnerUnsafe schedule_, const WriteSettings & write_settings_) - : WriteBufferFromFileBase(buf_size_, nullptr, 0) + : WriteBufferFromFileBase(std::min(buf_size_, static_cast(DBMS_DEFAULT_BUFFER_SIZE)), nullptr, 0) , bucket(bucket_) , key(key_) , request_settings(request_settings_) @@ -352,30 +352,15 @@ void WriteBufferFromS3::allocateBuffer() buffer_allocation_policy->nextBuffer(); chassert(0 == hidden_size); + /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. + /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. if (buffer_allocation_policy->getBufferNumber() == 1) - { - allocateFirstBuffer(); return; - } memory = Memory(buffer_allocation_policy->getBufferSize()); WriteBuffer::set(memory.data(), memory.size()); } -void WriteBufferFromS3::allocateFirstBuffer() -{ - /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. - /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. - /// But it may happen that buffer size provided in constructor is larger then desired buffer size from buffer_allocation_policy. - /// Resize memory in this case to the desired size. - const auto max_first_buffer = buffer_allocation_policy->getBufferSize(); - if (memory.size() > max_first_buffer) - { - memory.resize(max_first_buffer); - WriteBuffer::set(memory.data(), memory.size()); - } -} - void WriteBufferFromS3::setFakeBufferWhenPreFinalized() { WriteBuffer::set(fake_buffer_when_prefinalized, sizeof(fake_buffer_when_prefinalized)); diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index b026da607c5..604f036fcb8 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -64,7 +64,6 @@ private: void reallocateFirstBuffer(); void detachBuffer(); void allocateBuffer(); - void allocateFirstBuffer(); void setFakeBufferWhenPreFinalized(); S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data); From 33866fb5bd8c682abcd47593aa8d717f2611a3c0 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Sun, 8 Sep 2024 08:38:05 +0000 Subject: [PATCH 12/24] Fix config.h for v3.7.1 --- contrib/libarchive-cmake/config.h | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/contrib/libarchive-cmake/config.h b/contrib/libarchive-cmake/config.h index 0b0cab47a52..6e6870e5d70 100644 --- a/contrib/libarchive-cmake/config.h +++ b/contrib/libarchive-cmake/config.h @@ -334,13 +334,13 @@ typedef uint64_t uintmax_t; /* #undef ARCHIVE_XATTR_LINUX */ /* Version number of bsdcpio */ -#define BSDCPIO_VERSION_STRING "3.7.0" +#define BSDCPIO_VERSION_STRING "3.7.1" /* Version number of bsdtar */ -#define BSDTAR_VERSION_STRING "3.7.0" +#define BSDTAR_VERSION_STRING "3.7.1" /* Version number of bsdcat */ -#define BSDCAT_VERSION_STRING "3.7.0" +#define BSDCAT_VERSION_STRING "3.7.1" /* Define to 1 if you have the `acl_create_entry' function. */ /* #undef HAVE_ACL_CREATE_ENTRY */ @@ -642,6 +642,9 @@ typedef uint64_t uintmax_t; /* Define to 1 if you have the `getgrnam_r' function. */ #define HAVE_GETGRNAM_R 1 +/* Define to 1 if you have the `getline' function. */ +#define HAVE_GETLINE 1 + /* Define to 1 if platform uses `optreset` to reset `getopt` */ #define HAVE_GETOPT_OPTRESET 1 @@ -1273,13 +1276,13 @@ typedef uint64_t uintmax_t; /* #undef HAVE__MKGMTIME */ /* Define as const if the declaration of iconv() needs const. */ -#define ICONV_CONST +#define ICONV_CONST /* Version number of libarchive as a single integer */ -#define LIBARCHIVE_VERSION_NUMBER "3007000" +#define LIBARCHIVE_VERSION_NUMBER "3007001" /* Version number of libarchive */ -#define LIBARCHIVE_VERSION_STRING "3.7.0" +#define LIBARCHIVE_VERSION_STRING "3.7.1" /* Define to 1 if `lstat' dereferences a symlink specified with a trailing slash. */ @@ -1333,7 +1336,7 @@ typedef uint64_t uintmax_t; #endif /* SAFE_TO_DEFINE_EXTENSIONS */ /* Version number of package */ -#define VERSION "3.7.0" +#define VERSION "3.7.1" /* Number of bits in a file offset, on hosts where this is settable. */ /* #undef _FILE_OFFSET_BITS */ From 483dd7eebea0cd002a8c2db0fa364437728b3081 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Sun, 8 Sep 2024 08:45:13 +0000 Subject: [PATCH 13/24] Bump to v3.7.2 --- contrib/libarchive | 2 +- contrib/libarchive-cmake/config.h | 24 +++++++++++++++--------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/contrib/libarchive b/contrib/libarchive index 0c21691b177..6468cd1f5c9 160000 --- a/contrib/libarchive +++ b/contrib/libarchive @@ -1 +1 @@ -Subproject commit 0c21691b177fac5f4cceca2a1ff2ddfa5d60f51c +Subproject commit 6468cd1f5c9b76e2c3b10cdd6938faf6b82823b6 diff --git a/contrib/libarchive-cmake/config.h b/contrib/libarchive-cmake/config.h index 6e6870e5d70..ad5d83168dc 100644 --- a/contrib/libarchive-cmake/config.h +++ b/contrib/libarchive-cmake/config.h @@ -334,13 +334,16 @@ typedef uint64_t uintmax_t; /* #undef ARCHIVE_XATTR_LINUX */ /* Version number of bsdcpio */ -#define BSDCPIO_VERSION_STRING "3.7.1" +#define BSDCPIO_VERSION_STRING "3.7.2" /* Version number of bsdtar */ -#define BSDTAR_VERSION_STRING "3.7.1" +#define BSDTAR_VERSION_STRING "3.7.2" /* Version number of bsdcat */ -#define BSDCAT_VERSION_STRING "3.7.1" +#define BSDCAT_VERSION_STRING "3.7.2" + +/* Version number of bsdunzip */ +#define BSDUNZIP_VERSION_STRING "3.7.2" /* Define to 1 if you have the `acl_create_entry' function. */ /* #undef HAVE_ACL_CREATE_ENTRY */ @@ -645,9 +648,6 @@ typedef uint64_t uintmax_t; /* Define to 1 if you have the `getline' function. */ #define HAVE_GETLINE 1 -/* Define to 1 if platform uses `optreset` to reset `getopt` */ -#define HAVE_GETOPT_OPTRESET 1 - /* Define to 1 if you have the `getpid' function. */ #define HAVE_GETPID 1 @@ -1032,6 +1032,12 @@ typedef uint64_t uintmax_t; /* Define to 1 if you have the `strrchr' function. */ #define HAVE_STRRCHR 1 +/* Define to 1 if the system has the type `struct statfs'. */ +/* #undef HAVE_STRUCT_STATFS */ + +/* Define to 1 if `f_iosize' is a member of `struct statfs'. */ +/* #undef HAVE_STRUCT_STATFS_F_IOSIZE */ + /* Define to 1 if `f_namemax' is a member of `struct statfs'. */ /* #undef HAVE_STRUCT_STATFS_F_NAMEMAX */ @@ -1279,10 +1285,10 @@ typedef uint64_t uintmax_t; #define ICONV_CONST /* Version number of libarchive as a single integer */ -#define LIBARCHIVE_VERSION_NUMBER "3007001" +#define LIBARCHIVE_VERSION_NUMBER "3007002" /* Version number of libarchive */ -#define LIBARCHIVE_VERSION_STRING "3.7.1" +#define LIBARCHIVE_VERSION_STRING "3.7.2" /* Define to 1 if `lstat' dereferences a symlink specified with a trailing slash. */ @@ -1336,7 +1342,7 @@ typedef uint64_t uintmax_t; #endif /* SAFE_TO_DEFINE_EXTENSIONS */ /* Version number of package */ -#define VERSION "3.7.1" +#define VERSION "3.7.2" /* Number of bits in a file offset, on hosts where this is settable. */ /* #undef _FILE_OFFSET_BITS */ From 66572aa029122c433acd743fdf77c4585cee7d8d Mon Sep 17 00:00:00 2001 From: Nikita Fomichev Date: Fri, 6 Sep 2024 16:13:08 +0200 Subject: [PATCH 14/24] Fix 24.8 setting compatibility `rows_before_aggregation` https://github.com/ClickHouse/ClickHouse/pull/66084 --- src/Core/SettingsChangesHistory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 258065dcfd4..ed1b1d4f177 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -81,7 +81,7 @@ static std::initializer_list Date: Mon, 9 Sep 2024 15:12:17 +0000 Subject: [PATCH 15/24] Fix unit tests --- src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp | 9 +++++++++ src/IO/WriteBufferFromS3.cpp | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp index cbcfe0bdb97..229b36a05f6 100644 --- a/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp @@ -251,7 +251,16 @@ void WriteBufferFromAzureBlobStorage::allocateBuffer() /// First buffer was already allocated in BufferWithOwnMemory constructor with buffer size provided in constructor. /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. if (buffer_allocation_policy->getBufferNumber() == 1) + { + /// Reduce memory size if initial size was larger then desired size from buffer_allocation_policy. + /// Usually it doesn't happen but we have it in unit tests. + if (memory.size() > buffer_allocation_policy->getBufferSize()) + { + memory.resize(buffer_allocation_policy->getBufferSize()); + WriteBuffer::set(memory.data(), memory.size()); + } return; + } auto size = buffer_allocation_policy->getBufferSize(); memory = Memory(size); diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 424708cf2c8..3a4ab3ee882 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -355,7 +355,16 @@ void WriteBufferFromS3::allocateBuffer() /// First buffer was already allocated in BufferWithOwnMemory constructor with provided in constructor buffer size. /// It will be reallocated in subsequent nextImpl calls up to the desired buffer size from buffer_allocation_policy. if (buffer_allocation_policy->getBufferNumber() == 1) + { + /// Reduce memory size if initial size was larger then desired size from buffer_allocation_policy. + /// Usually it doesn't happen but we have it in unit tests. + if (memory.size() > buffer_allocation_policy->getBufferSize()) + { + memory.resize(buffer_allocation_policy->getBufferSize()); + WriteBuffer::set(memory.data(), memory.size()); + } return; + } memory = Memory(buffer_allocation_policy->getBufferSize()); WriteBuffer::set(memory.data(), memory.size()); From e252bdc30d7b10e257ff63b0288dc808ed717d53 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Sun, 8 Sep 2024 12:57:37 +0000 Subject: [PATCH 16/24] Bump to v3.7.3 --- contrib/libarchive | 2 +- contrib/libarchive-cmake/CMakeLists.txt | 8 +++---- contrib/libarchive-cmake/config.h | 28 ++++++++++++++++--------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/contrib/libarchive b/contrib/libarchive index 6468cd1f5c9..4fcc02d906c 160000 --- a/contrib/libarchive +++ b/contrib/libarchive @@ -1 +1 @@ -Subproject commit 6468cd1f5c9b76e2c3b10cdd6938faf6b82823b6 +Subproject commit 4fcc02d906cca4b9e21a78a833f1142a2689ec52 diff --git a/contrib/libarchive-cmake/CMakeLists.txt b/contrib/libarchive-cmake/CMakeLists.txt index e89770da5f6..aa6dd9638b6 100644 --- a/contrib/libarchive-cmake/CMakeLists.txt +++ b/contrib/libarchive-cmake/CMakeLists.txt @@ -1,6 +1,6 @@ set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libarchive") -set(SRCS +set(SRCS "${LIBRARY_DIR}/libarchive/archive_acl.c" "${LIBRARY_DIR}/libarchive/archive_blake2sp_ref.c" "${LIBRARY_DIR}/libarchive/archive_blake2s_ref.c" @@ -135,7 +135,7 @@ set(SRCS ) add_library(_libarchive ${SRCS}) -target_include_directories(_libarchive PUBLIC +target_include_directories(_libarchive PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} "${LIBRARY_DIR}/libarchive" ) @@ -157,7 +157,7 @@ if (TARGET ch_contrib::zlib) endif() if (TARGET ch_contrib::zstd) - target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1 HAVE_LIBZSTD_COMPRESSOR=1) + target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1 HAVE_ZSTD_compressStream=1) target_link_libraries(_libarchive PRIVATE ch_contrib::zstd) endif() @@ -179,4 +179,4 @@ if (OS_LINUX) ) endif() -add_library(ch_contrib::libarchive ALIAS _libarchive) \ No newline at end of file +add_library(ch_contrib::libarchive ALIAS _libarchive) diff --git a/contrib/libarchive-cmake/config.h b/contrib/libarchive-cmake/config.h index ad5d83168dc..86a797b575d 100644 --- a/contrib/libarchive-cmake/config.h +++ b/contrib/libarchive-cmake/config.h @@ -334,16 +334,16 @@ typedef uint64_t uintmax_t; /* #undef ARCHIVE_XATTR_LINUX */ /* Version number of bsdcpio */ -#define BSDCPIO_VERSION_STRING "3.7.2" +#define BSDCPIO_VERSION_STRING "3.7.3" /* Version number of bsdtar */ -#define BSDTAR_VERSION_STRING "3.7.2" +#define BSDTAR_VERSION_STRING "3.7.3" /* Version number of bsdcat */ -#define BSDCAT_VERSION_STRING "3.7.2" +#define BSDCAT_VERSION_STRING "3.7.3" /* Version number of bsdunzip */ -#define BSDUNZIP_VERSION_STRING "3.7.2" +#define BSDUNZIP_VERSION_STRING "3.7.3" /* Define to 1 if you have the `acl_create_entry' function. */ /* #undef HAVE_ACL_CREATE_ENTRY */ @@ -753,6 +753,12 @@ typedef uint64_t uintmax_t; /* Define to 1 if you have the `pcreposix' library (-lpcreposix). */ /* #undef HAVE_LIBPCREPOSIX */ +/* Define to 1 if you have the `pcre2-8' library (-lpcre2-8). */ +/* #undef HAVE_LIBPCRE2 */ + +/* Define to 1 if you have the `pcreposix' library (-lpcre2posix). */ +/* #undef HAVE_LIBPCRE2POSIX */ + /* Define to 1 if you have the `xml2' library (-lxml2). */ #define HAVE_LIBXML2 1 @@ -768,9 +774,8 @@ typedef uint64_t uintmax_t; /* Define to 1 if you have the `zstd' library (-lzstd). */ /* #undef HAVE_LIBZSTD */ -/* Define to 1 if you have the `zstd' library (-lzstd) with compression - support. */ -/* #undef HAVE_LIBZSTD_COMPRESSOR */ +/* Define to 1 if you have the ZSTD_compressStream function. */ +/* #undef HAVE_ZSTD_compressStream */ /* Define to 1 if you have the header file. */ #define HAVE_LIMITS_H 1 @@ -926,6 +931,9 @@ typedef uint64_t uintmax_t; /* Define to 1 if you have the header file. */ /* #undef HAVE_PCREPOSIX_H */ +/* Define to 1 if you have the header file. */ +/* #undef HAVE_PCRE2POSIX_H */ + /* Define to 1 if you have the `pipe' function. */ #define HAVE_PIPE 1 @@ -1285,10 +1293,10 @@ typedef uint64_t uintmax_t; #define ICONV_CONST /* Version number of libarchive as a single integer */ -#define LIBARCHIVE_VERSION_NUMBER "3007002" +#define LIBARCHIVE_VERSION_NUMBER "3007003" /* Version number of libarchive */ -#define LIBARCHIVE_VERSION_STRING "3.7.2" +#define LIBARCHIVE_VERSION_STRING "3.7.3" /* Define to 1 if `lstat' dereferences a symlink specified with a trailing slash. */ @@ -1342,7 +1350,7 @@ typedef uint64_t uintmax_t; #endif /* SAFE_TO_DEFINE_EXTENSIONS */ /* Version number of package */ -#define VERSION "3.7.2" +#define VERSION "3.7.3" /* Number of bits in a file offset, on hosts where this is settable. */ /* #undef _FILE_OFFSET_BITS */ From 6464d47d34134d9134b37e404e48a1182695d47b Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Mon, 9 Sep 2024 20:29:41 +0000 Subject: [PATCH 17/24] Bump to v3.7.4 --- contrib/libarchive | 2 +- contrib/libarchive-cmake/config.h | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/contrib/libarchive b/contrib/libarchive index 4fcc02d906c..313aa1fa10b 160000 --- a/contrib/libarchive +++ b/contrib/libarchive @@ -1 +1 @@ -Subproject commit 4fcc02d906cca4b9e21a78a833f1142a2689ec52 +Subproject commit 313aa1fa10b657de791e3202c168a6c833bc3543 diff --git a/contrib/libarchive-cmake/config.h b/contrib/libarchive-cmake/config.h index 86a797b575d..9696cfb112d 100644 --- a/contrib/libarchive-cmake/config.h +++ b/contrib/libarchive-cmake/config.h @@ -334,16 +334,16 @@ typedef uint64_t uintmax_t; /* #undef ARCHIVE_XATTR_LINUX */ /* Version number of bsdcpio */ -#define BSDCPIO_VERSION_STRING "3.7.3" +#define BSDCPIO_VERSION_STRING "3.7.4" /* Version number of bsdtar */ -#define BSDTAR_VERSION_STRING "3.7.3" +#define BSDTAR_VERSION_STRING "3.7.4" /* Version number of bsdcat */ -#define BSDCAT_VERSION_STRING "3.7.3" +#define BSDCAT_VERSION_STRING "3.7.4" /* Version number of bsdunzip */ -#define BSDUNZIP_VERSION_STRING "3.7.3" +#define BSDUNZIP_VERSION_STRING "3.7.4" /* Define to 1 if you have the `acl_create_entry' function. */ /* #undef HAVE_ACL_CREATE_ENTRY */ @@ -1094,6 +1094,9 @@ typedef uint64_t uintmax_t; /* Define to 1 if you have the `symlink' function. */ #define HAVE_SYMLINK 1 +/* Define to 1 if you have the `sysconf' function. */ +#define HAVE_SYSCONF 1 + /* Define to 1 if you have the header file. */ /* #undef HAVE_SYS_ACL_H */ @@ -1293,10 +1296,10 @@ typedef uint64_t uintmax_t; #define ICONV_CONST /* Version number of libarchive as a single integer */ -#define LIBARCHIVE_VERSION_NUMBER "3007003" +#define LIBARCHIVE_VERSION_NUMBER "3007004" /* Version number of libarchive */ -#define LIBARCHIVE_VERSION_STRING "3.7.3" +#define LIBARCHIVE_VERSION_STRING "3.7.4" /* Define to 1 if `lstat' dereferences a symlink specified with a trailing slash. */ @@ -1350,7 +1353,7 @@ typedef uint64_t uintmax_t; #endif /* SAFE_TO_DEFINE_EXTENSIONS */ /* Version number of package */ -#define VERSION "3.7.3" +#define VERSION "3.7.4" /* Number of bits in a file offset, on hosts where this is settable. */ /* #undef _FILE_OFFSET_BITS */ From 49ea016a3d73467afa85b1558b245eaac180c696 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Tue, 10 Sep 2024 10:19:23 +0100 Subject: [PATCH 18/24] impl --- tests/queries/0_stateless/01603_read_with_backoff_bug.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/01603_read_with_backoff_bug.sql b/tests/queries/0_stateless/01603_read_with_backoff_bug.sql index 8a6fa9b7845..212a18b5799 100644 --- a/tests/queries/0_stateless/01603_read_with_backoff_bug.sql +++ b/tests/queries/0_stateless/01603_read_with_backoff_bug.sql @@ -11,7 +11,7 @@ create table t (x UInt64, s String) engine = MergeTree order by x SETTINGS index INSERT INTO t SELECT number, if(number < (8129 * 1024), arrayStringConcat(arrayMap(x -> toString(x), range(number % 128)), ' '), '') -FROM numbers_mt((8129 * 1024) * 3) settings max_insert_threads=8, max_rows_to_read=0; +FROM numbers_mt((8129 * 1024) * 3) settings max_insert_threads=8, max_rows_to_read=0, max_memory_usage='10Gi'; -- optimize table t final; From 8b1f21e7dd491bbf56bd3ebdf1f00bee1a78d7fe Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Tue, 10 Sep 2024 12:18:53 +0200 Subject: [PATCH 19/24] Hide exported logs statements in GH logs --- docker/test/base/setup_export_logs.sh | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docker/test/base/setup_export_logs.sh b/docker/test/base/setup_export_logs.sh index e544397dd0c..b007c3a3aa0 100755 --- a/docker/test/base/setup_export_logs.sh +++ b/docker/test/base/setup_export_logs.sh @@ -184,7 +184,12 @@ function setup_logs_replication /^TTL /d ') - echo -e "Creating remote destination table ${table}_${hash} with statement:\n${statement}" >&2 + echo -e "Creating remote destination table ${table}_${hash} with statement:" >&2 + echo "::group::${table}" + cat >&2 < Date: Tue, 10 Sep 2024 12:56:19 +0200 Subject: [PATCH 20/24] Add a hostname to log; we need something to grep remote query_log --- docker/test/base/setup_export_logs.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker/test/base/setup_export_logs.sh b/docker/test/base/setup_export_logs.sh index b007c3a3aa0..6924f9c79dd 100755 --- a/docker/test/base/setup_export_logs.sh +++ b/docker/test/base/setup_export_logs.sh @@ -124,6 +124,8 @@ function setup_logs_replication check_logs_credentials || return 0 __set_connection_args + echo "My hostname is ${HOSTNAME}" + echo 'Create all configured system logs' clickhouse-client --query "SYSTEM FLUSH LOGS" From 41aaf075370a01dcfeda649a5c072e7746a48cb1 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 10 Sep 2024 14:17:57 +0200 Subject: [PATCH 21/24] Test materialize() on sparse columns --- .../01780_column_sparse_materialize.reference | 50 ++++++++++++++++++ .../01780_column_sparse_materialize.sql | 52 +++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 tests/queries/0_stateless/01780_column_sparse_materialize.reference create mode 100644 tests/queries/0_stateless/01780_column_sparse_materialize.sql diff --git a/tests/queries/0_stateless/01780_column_sparse_materialize.reference b/tests/queries/0_stateless/01780_column_sparse_materialize.reference new file mode 100644 index 00000000000..660cfabff33 --- /dev/null +++ b/tests/queries/0_stateless/01780_column_sparse_materialize.reference @@ -0,0 +1,50 @@ +-- { echoOn } + +SELECT dumpColumnStructure(id) FROM sparse_t; +UInt64, Sparse(size = 2, UInt64(size = 2), UInt64(size = 1)) +UInt64, Sparse(size = 2, UInt64(size = 2), UInt64(size = 1)) +SELECT dumpColumnStructure(materialize(id)) FROM sparse_t; +UInt64, UInt64(size = 2) +UInt64, UInt64(size = 2) +SELECT dumpColumnStructure(u) FROM sparse_t; +UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)) +UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)) +SELECT dumpColumnStructure(materialize(u)) FROM sparse_t; +UInt64, UInt64(size = 2) +UInt64, UInt64(size = 2) +SELECT dumpColumnStructure(s) FROM sparse_t; +String, Sparse(size = 2, String(size = 2), UInt64(size = 1)) +String, Sparse(size = 2, String(size = 2), UInt64(size = 1)) +SELECT dumpColumnStructure(materialize(s)) FROM sparse_t; +String, String(size = 2) +String, String(size = 2) +SELECT dumpColumnStructure(arr1) FROM sparse_t; +Array(String), Array(size = 2, UInt64(size = 2), String(size = 1)) +Array(String), Array(size = 2, UInt64(size = 2), String(size = 1)) +SELECT dumpColumnStructure(materialize(arr1)) FROM sparse_t; +Array(String), Array(size = 2, UInt64(size = 2), String(size = 1)) +Array(String), Array(size = 2, UInt64(size = 2), String(size = 1)) +SELECT dumpColumnStructure(arr2) FROM sparse_t; +Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1)) +Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1)) +SELECT dumpColumnStructure(materialize(arr2)) FROM sparse_t; +Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1)) +Array(UInt64), Array(size = 2, UInt64(size = 2), UInt64(size = 1)) +SELECT dumpColumnStructure(t) FROM sparse_t; +Tuple(a UInt64, s String), Tuple(size = 2, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)), Sparse(size = 2, String(size = 1), UInt64(size = 0))) +Tuple(a UInt64, s String), Tuple(size = 2, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)), Sparse(size = 2, String(size = 1), UInt64(size = 0))) +SELECT dumpColumnStructure(materialize(t)) FROM sparse_t; +Tuple(a UInt64, s String), Tuple(size = 2, UInt64(size = 2), String(size = 2)) +Tuple(a UInt64, s String), Tuple(size = 2, UInt64(size = 2), String(size = 2)) +SELECT dumpColumnStructure(t.a) FROM sparse_t; +UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)) +UInt64, Sparse(size = 2, UInt64(size = 1), UInt64(size = 0)) +SELECT dumpColumnStructure(materialize(t.a)) FROM sparse_t; +UInt64, UInt64(size = 2) +UInt64, UInt64(size = 2) +SELECT dumpColumnStructure(t.s) FROM sparse_t; +String, Sparse(size = 2, String(size = 1), UInt64(size = 0)) +String, Sparse(size = 2, String(size = 1), UInt64(size = 0)) +SELECT dumpColumnStructure(materialize(t.s)) FROM sparse_t; +String, String(size = 2) +String, String(size = 2) diff --git a/tests/queries/0_stateless/01780_column_sparse_materialize.sql b/tests/queries/0_stateless/01780_column_sparse_materialize.sql new file mode 100644 index 00000000000..a53ea140f0f --- /dev/null +++ b/tests/queries/0_stateless/01780_column_sparse_materialize.sql @@ -0,0 +1,52 @@ +DROP TABLE IF EXISTS sparse_t; + +CREATE TABLE sparse_t ( + id UInt64, + u UInt64, + s String, + arr1 Array(String), + arr2 Array(UInt64), + t Tuple(a UInt64, s String)) +ENGINE = MergeTree ORDER BY tuple() +SETTINGS ratio_of_defaults_for_sparse_serialization = 0.1; + +INSERT INTO sparse_t SELECT + number, + if (number % 2 = 0, number, 0), + if (number % 2 = 0, toString(number), ''), + if (number % 2 = 0, [''], []), + if (number % 2 = 0, [0], []), + (if (number % 2 = 0, number, 0), '') +FROM numbers(2); + +-- { echoOn } + +SELECT dumpColumnStructure(id) FROM sparse_t; +SELECT dumpColumnStructure(materialize(id)) FROM sparse_t; + +SELECT dumpColumnStructure(u) FROM sparse_t; +SELECT dumpColumnStructure(materialize(u)) FROM sparse_t; + +SELECT dumpColumnStructure(s) FROM sparse_t; +SELECT dumpColumnStructure(materialize(s)) FROM sparse_t; + +SELECT dumpColumnStructure(arr1) FROM sparse_t; +SELECT dumpColumnStructure(materialize(arr1)) FROM sparse_t; + +SELECT dumpColumnStructure(arr2) FROM sparse_t; +SELECT dumpColumnStructure(materialize(arr2)) FROM sparse_t; + +SELECT dumpColumnStructure(t) FROM sparse_t; +SELECT dumpColumnStructure(materialize(t)) FROM sparse_t; + +SELECT dumpColumnStructure(t.a) FROM sparse_t; +SELECT dumpColumnStructure(materialize(t.a)) FROM sparse_t; + +SELECT dumpColumnStructure(t.s) FROM sparse_t; +SELECT dumpColumnStructure(materialize(t.s)) FROM sparse_t; + +-- { echoOff } + + +DROP TABLE IF EXISTS sparse_t +; From a01229bba2ce4ee7a3a02301205e2e4bcc47a2f8 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 10 Sep 2024 14:18:37 +0200 Subject: [PATCH 22/24] Convert sparse columns to full in materialize() --- src/Functions/materialize.h | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/Functions/materialize.h b/src/Functions/materialize.h index 41994509745..571391faba7 100644 --- a/src/Functions/materialize.h +++ b/src/Functions/materialize.h @@ -2,7 +2,7 @@ #include #include #include -#include +#include namespace DB { @@ -18,11 +18,6 @@ public: return std::make_shared(); } - bool useDefaultImplementationForNulls() const override - { - return false; - } - /// Get the function name. String getName() const override { @@ -34,8 +29,16 @@ public: return true; } + bool useDefaultImplementationForNulls() const override { return false; } + + bool useDefaultImplementationForNothing() const override { return false; } + + bool useDefaultImplementationForConstants() const override { return false; } + bool useDefaultImplementationForLowCardinalityColumns() const override { return false; } + bool useDefaultImplementationForSparseColumns() const override { return false; } + bool isSuitableForConstantFolding() const override { return false; } bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } @@ -52,7 +55,7 @@ public: ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override { - return arguments[0].column->convertToFullColumnIfConst(); + return recursiveRemoveSparse(arguments[0].column->convertToFullColumnIfConst()); } bool hasInformationAboutMonotonicity() const override { return true; } From 30cf6ee711d7cf02890e347328e3e3559625df28 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 10 Sep 2024 14:30:07 +0200 Subject: [PATCH 23/24] Update 03232_pr_not_ready_set.sql --- tests/queries/0_stateless/03232_pr_not_ready_set.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/03232_pr_not_ready_set.sql b/tests/queries/0_stateless/03232_pr_not_ready_set.sql index 1a724085903..3b2d5d28cfb 100644 --- a/tests/queries/0_stateless/03232_pr_not_ready_set.sql +++ b/tests/queries/0_stateless/03232_pr_not_ready_set.sql @@ -1,3 +1,4 @@ +SYSTEM FLUSH LOGS; SELECT is_initial_query, count() AS c, From 7727b30f5cd7482a79e25ab3148f29380065aeec Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Tue, 10 Sep 2024 18:06:32 +0200 Subject: [PATCH 24/24] Attempt to fix EAGAIN error --- docker/test/base/setup_export_logs.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker/test/base/setup_export_logs.sh b/docker/test/base/setup_export_logs.sh index 6924f9c79dd..3df9655701c 100755 --- a/docker/test/base/setup_export_logs.sh +++ b/docker/test/base/setup_export_logs.sh @@ -188,9 +188,9 @@ function setup_logs_replication echo -e "Creating remote destination table ${table}_${hash} with statement:" >&2 echo "::group::${table}" - cat >&2 <