From 8186339307e0e5242439ab2286bd05fae1b489f5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 5 Oct 2021 12:11:25 +0300 Subject: [PATCH 1/8] Less seeks in compressed buffers --- .../CachedCompressedReadBuffer.cpp | 37 +++++++++------ src/Compression/CachedCompressedReadBuffer.h | 10 ++++- .../CompressedReadBufferFromFile.cpp | 44 +++++++++++------- .../CompressedReadBufferFromFile.h | 16 +++++++ .../MergeTree/MergeTreeReaderWide.cpp | 45 ++++++++++++++----- src/Storages/MergeTree/MergeTreeReaderWide.h | 7 +-- 6 files changed, 114 insertions(+), 45 deletions(-) diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index d511266d139..bb0a17b730f 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -2,6 +2,7 @@ #include #include +#include #include @@ -30,7 +31,6 @@ void CachedCompressedReadBuffer::initInput() bool CachedCompressedReadBuffer::nextImpl() { - /// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists. UInt128 key = cache->hash(path, file_pos); @@ -60,6 +60,13 @@ bool CachedCompressedReadBuffer::nextImpl() working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes); + /// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to + /// check that we are not seeking beyond working buffer. + if (nextimpl_working_buffer_offset > working_buffer.size()) + throw Exception("Seek position is beyond the decompressed block" + " (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")", + ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); + file_pos += owned_cell->compressed_size; return true; @@ -74,28 +81,32 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer( void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { + + /// Nothing to do if we already at required position + if (file_pos == offset_in_compressed_file + && (offset() == offset_in_decompressed_block || + nextimpl_working_buffer_offset == offset_in_decompressed_block)) + return; + if (owned_cell && offset_in_compressed_file == file_pos - owned_cell->compressed_size && offset_in_decompressed_block <= working_buffer.size()) { - bytes += offset(); pos = working_buffer.begin() + offset_in_decompressed_block; - bytes -= offset(); } else { + + LOG_INFO(&Poco::Logger::get("DEBUG"), "Adding offset {}", offset()); + /// Remember position in compressed file (will be moved in nextImpl) file_pos = offset_in_compressed_file; - + /// We will discard our working_buffer, but have to account rest bytes bytes += offset(); - nextImpl(); - - if (offset_in_decompressed_block > working_buffer.size()) - throw Exception("Seek position is beyond the decompressed block" - " (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")", - ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); - - pos = working_buffer.begin() + offset_in_decompressed_block; - bytes -= offset(); + /// No data, everything discarded + pos = working_buffer.end(); + /// Remember required offset in decompressed block which will be set in + /// the next ReadBuffer::next() call + nextimpl_working_buffer_offset = offset_in_decompressed_block; } } diff --git a/src/Compression/CachedCompressedReadBuffer.h b/src/Compression/CachedCompressedReadBuffer.h index c2338f6f841..bb24f699eed 100644 --- a/src/Compression/CachedCompressedReadBuffer.h +++ b/src/Compression/CachedCompressedReadBuffer.h @@ -15,7 +15,7 @@ namespace DB * The external cache is passed as an argument to the constructor. * Allows you to increase performance in cases where the same blocks are often read. * Disadvantages: - * - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and. + * - in case you need to read a lot of data in a row, but some of them only a part is cached, you have to do seek-and. */ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer { @@ -25,6 +25,8 @@ private: std::unique_ptr file_in; const std::string path; + + /// Current position in file_in size_t file_pos; /// A piece of data from the cache, or a piece of read data that we put into the cache. @@ -37,9 +39,15 @@ private: ReadBufferFromFileBase::ProfileCallback profile_callback; clockid_t clock_type {}; + + /// Check comment in CompressedReadBuffer + /* size_t nextimpl_working_buffer_offset; */ + public: CachedCompressedReadBuffer(const std::string & path, std::function()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false); + /// Seek is lazy. It doesn't move the position anywhere, just remember them and perform actual + /// seek inside nextImpl. void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) diff --git a/src/Compression/CompressedReadBufferFromFile.cpp b/src/Compression/CompressedReadBufferFromFile.cpp index 934ce33a056..5760d1d638f 100644 --- a/src/Compression/CompressedReadBufferFromFile.cpp +++ b/src/Compression/CompressedReadBufferFromFile.cpp @@ -33,6 +33,13 @@ bool CompressedReadBufferFromFile::nextImpl() decompress(working_buffer, size_decompressed, size_compressed_without_checksum); + /// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to + /// check that we are not seeking beyond working buffer. + if (nextimpl_working_buffer_offset > working_buffer.size()) + throw Exception("Required to move position beyond the decompressed block" + " (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")", + ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); + return true; } @@ -67,33 +74,34 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile( void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { + /// Nothing to do if we already at required position + if (file_in.getPosition() - size_compressed == offset_in_compressed_file && /// correct position in compressed file + (offset() == offset_in_decompressed_block /// correct position in buffer or + || nextimpl_working_buffer_offset == offset_in_decompressed_block)) /// we will move our position to correct one + return; + + /// Our seek is within working_buffer, so just move the position if (size_compressed && offset_in_compressed_file == file_in.getPosition() - size_compressed && offset_in_decompressed_block <= working_buffer.size()) { - bytes += offset(); pos = working_buffer.begin() + offset_in_decompressed_block; - /// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right. - bytes -= offset(); } - else + else /// Our seek outside working buffer, so perform "lazy seek" { + /// Actually seek compressed file file_in.seek(offset_in_compressed_file, SEEK_SET); - + /// We will discard our working_buffer, but have to account rest bytes bytes += offset(); - nextImpl(); - - if (offset_in_decompressed_block > working_buffer.size()) - throw Exception("Seek position is beyond the decompressed block" - " (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")", - ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); - - pos = working_buffer.begin() + offset_in_decompressed_block; - bytes -= offset(); + /// No data, everything discarded + pos = working_buffer.end(); + size_compressed = 0; + /// Remember required offset in decompressed block which will be set in + /// the next ReadBuffer::next() call + nextimpl_working_buffer_offset = offset_in_decompressed_block; } } - size_t CompressedReadBufferFromFile::readBig(char * to, size_t n) { size_t bytes_read = 0; @@ -134,7 +142,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n) working_buffer = Buffer(memory.data(), &memory[size_decompressed]); decompress(working_buffer, size_decompressed, size_compressed_without_checksum); - pos = working_buffer.begin(); + + /// Manually take nextimpl_working_buffer_offset into account, because we don't use + /// nextImpl in this method. + pos = working_buffer.begin() + nextimpl_working_buffer_offset; + nextimpl_working_buffer_offset = 0; bytes_read += read(to + bytes_read, n - bytes_read); break; diff --git a/src/Compression/CompressedReadBufferFromFile.h b/src/Compression/CompressedReadBufferFromFile.h index fcca48699b3..5f027851da3 100644 --- a/src/Compression/CompressedReadBufferFromFile.h +++ b/src/Compression/CompressedReadBufferFromFile.h @@ -28,6 +28,19 @@ private: ReadBufferFromFileBase & file_in; size_t size_compressed = 0; + /// This field inherited from ReadBuffer. It's used to perform "lazy" seek, so in seek() call we: + /// 1) actually seek only underlying compressed file_in to offset_in_compressed_file; + /// 2) reset current working_buffer; + /// 3) remember the position in decompressed block in nextimpl_working_buffer_offset. + /// After following ReadBuffer::next() -> nextImpl call we will read new data into working_buffer and + /// ReadBuffer::next() will move our position in the fresh working_buffer to nextimpl_working_buffer_offset and + /// reset it to zero. + /// + /// NOTE: We have independent readBig implementation, so we have to take + /// nextimpl_working_buffer_offset into account there as well. + /// + /* size_t nextimpl_working_buffer_offset; */ + bool nextImpl() override; void prefetch() override; @@ -37,6 +50,9 @@ public: CompressedReadBufferFromFile( const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false); + /// Seek is lazy in some sense. We move position in compressed file_in to offset_in_compressed_file, but don't + /// read data into working_buffer and don't shit our position to offset_in_decompressed_block. Instead + /// we store this offset inside nextimpl_working_buffer_offset. void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); size_t readBig(char * to, size_t n) override; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 2545120006d..9352eaff370 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -75,6 +75,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si OffsetColumns offset_columns; std::unordered_map caches; + std::unordered_set prefetched_streams; if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch) { /// Request reading of data in advance, @@ -86,7 +87,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si try { auto & cache = caches[column_from_part.getNameInStorage()]; - prefetch(column_from_part, from_mark, continue_reading, cache); + prefetch(column_from_part, from_mark, continue_reading, cache, prefetched_streams); } catch (Exception & e) { @@ -98,6 +99,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si } auto name_and_type = columns.begin(); + for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) { auto column_from_part = getColumnFromPart(*name_and_type); @@ -114,7 +116,9 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si size_t column_size_before_reading = column->size(); auto & cache = caches[column_from_part.getNameInStorage()]; - readData(column_from_part, column, from_mark, continue_reading, max_rows_to_read, cache); + readData( + column_from_part, column, from_mark, continue_reading, + max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty()); /// For elements of Nested, column_size_before_reading may be greater than column size /// if offsets are not empty and were already read, but elements are empty. @@ -190,11 +194,11 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type, static ReadBuffer * getStream( - bool stream_for_prefix, + bool seek_to_start, const ISerialization::SubstreamPath & substream_path, MergeTreeReaderWide::FileStreams & streams, const NameAndTypePair & name_and_type, - size_t from_mark, bool continue_reading, + size_t from_mark, bool seek_to_mark, ISerialization::SubstreamsCache & cache) { /// If substream have already been read. @@ -209,9 +213,9 @@ static ReadBuffer * getStream( MergeTreeReaderStream & stream = *it->second; - if (stream_for_prefix) + if (seek_to_start) stream.seekToStart(); - else if (!continue_reading) + else if (seek_to_mark) stream.seekToMark(from_mark); return stream.data_buffer; @@ -222,15 +226,25 @@ void MergeTreeReaderWide::prefetch( const NameAndTypePair & name_and_type, size_t from_mark, bool continue_reading, - ISerialization::SubstreamsCache & cache) + ISerialization::SubstreamsCache & cache, + std::unordered_set & prefetched_streams) { const auto & name = name_and_type.name; auto & serialization = serializations[name]; serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { - if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache)) - buf->prefetch(); + bool seek_to_start = deserialize_binary_bulk_state_map.count(name) == 0; + String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); + + if (!prefetched_streams.count(stream_name)) + { + bool seek_to_mark = !continue_reading; + if (ReadBuffer * buf = getStream(seek_to_start, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache)) + buf->prefetch(); + + prefetched_streams.insert(stream_name); + } }); } @@ -238,7 +252,7 @@ void MergeTreeReaderWide::prefetch( void MergeTreeReaderWide::readData( const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark, bool continue_reading, size_t max_rows_to_read, - ISerialization::SubstreamsCache & cache) + ISerialization::SubstreamsCache & cache, bool was_prefetched) { double & avg_value_size_hint = avg_value_size_hints[name_and_type.name]; ISerialization::DeserializeBinaryBulkSettings deserialize_settings; @@ -251,14 +265,21 @@ void MergeTreeReaderWide::readData( { deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path) { - return getStream(true, substream_path, streams, name_and_type, from_mark, continue_reading, cache); + /// If data was already prefetched we don't need to seek to start + bool seek_to_start = !was_prefetched; + bool seek_to_mark = !was_prefetched && !continue_reading; + return getStream(seek_to_start, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache); }; serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]); } deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path) { - return getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache); + bool seek_to_mark = !was_prefetched && !continue_reading; + + return getStream( + /* seek_to_start = */false, substream_path, streams, name_and_type, from_mark, + seek_to_mark, cache); }; deserialize_settings.continuous_reading = continue_reading; auto & deserialize_state = deserialize_binary_bulk_state_map[name]; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.h b/src/Storages/MergeTree/MergeTreeReaderWide.h index d44732c007d..08d743370a9 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.h +++ b/src/Storages/MergeTree/MergeTreeReaderWide.h @@ -46,14 +46,15 @@ private: void readData( const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark, bool continue_reading, size_t max_rows_to_read, - ISerialization::SubstreamsCache & cache); + ISerialization::SubstreamsCache & cache, bool was_prefetched); - /// Make next readData more simple by calling 'prefetch' of all related ReadBuffers. + /// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams). void prefetch( const NameAndTypePair & name_and_type, size_t from_mark, bool continue_reading, - ISerialization::SubstreamsCache & cache); + ISerialization::SubstreamsCache & cache, + std::unordered_set & prefetched_streams); /// if stream was already prefetched do nothing }; } From 3a8126372c39413ee1560babd770e21d2187e0fc Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 5 Oct 2021 12:19:04 +0300 Subject: [PATCH 2/8] Remove redundant header --- src/Compression/CachedCompressedReadBuffer.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index bb0a17b730f..8c4f2eb03fa 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -2,7 +2,6 @@ #include #include -#include #include From c2f34928e001094373f7eb46a595fc5ac83f5f03 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 5 Oct 2021 13:04:44 +0300 Subject: [PATCH 3/8] Fixup --- src/Compression/CachedCompressedReadBuffer.cpp | 1 - src/Compression/CompressedReadBufferFromFile.cpp | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index 8c4f2eb03fa..e45d6338516 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -80,7 +80,6 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer( void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { - /// Nothing to do if we already at required position if (file_pos == offset_in_compressed_file && (offset() == offset_in_decompressed_block || diff --git a/src/Compression/CompressedReadBufferFromFile.cpp b/src/Compression/CompressedReadBufferFromFile.cpp index 5760d1d638f..12f821890fb 100644 --- a/src/Compression/CompressedReadBufferFromFile.cpp +++ b/src/Compression/CompressedReadBufferFromFile.cpp @@ -75,7 +75,7 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile( void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { /// Nothing to do if we already at required position - if (file_in.getPosition() - size_compressed == offset_in_compressed_file && /// correct position in compressed file + if (!size_compressed && static_cast(file_in.getPosition()) == offset_in_compressed_file && /// correct position in compressed file (offset() == offset_in_decompressed_block /// correct position in buffer or || nextimpl_working_buffer_offset == offset_in_decompressed_block)) /// we will move our position to correct one return; From 64158c011b29f45a1000617f0100c7de555b77c2 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 5 Oct 2021 13:12:31 +0300 Subject: [PATCH 4/8] Remove logging --- src/Compression/CachedCompressedReadBuffer.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index e45d6338516..4a583773b4b 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -94,8 +94,6 @@ void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t o } else { - - LOG_INFO(&Poco::Logger::get("DEBUG"), "Adding offset {}", offset()); /// Remember position in compressed file (will be moved in nextImpl) file_pos = offset_in_compressed_file; /// We will discard our working_buffer, but have to account rest bytes From 9aab0eb0a2214986240c1835b4bf1e3856128886 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 5 Oct 2021 13:41:09 +0300 Subject: [PATCH 5/8] Fixup --- src/Storages/MergeTree/MergeTreeReaderWide.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 9352eaff370..206469da7be 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -234,13 +234,12 @@ void MergeTreeReaderWide::prefetch( serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { - bool seek_to_start = deserialize_binary_bulk_state_map.count(name) == 0; String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); if (!prefetched_streams.count(stream_name)) { bool seek_to_mark = !continue_reading; - if (ReadBuffer * buf = getStream(seek_to_start, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache)) + if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache)) buf->prefetch(); prefetched_streams.insert(stream_name); @@ -265,10 +264,7 @@ void MergeTreeReaderWide::readData( { deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path) { - /// If data was already prefetched we don't need to seek to start - bool seek_to_start = !was_prefetched; - bool seek_to_mark = !was_prefetched && !continue_reading; - return getStream(seek_to_start, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache); + return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, from_mark, /* seek_to_mark = */false, cache); }; serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]); } From 342ec02664188df94852ad9375967ddc2922a75f Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 5 Oct 2021 21:06:36 +0300 Subject: [PATCH 6/8] Fix concurrent access to LowCardinality during GROUP BY (leads to SIGSEGV) The problem is that GROUP BY can update saved_hash, which can be also updated by subsequent update of a dictionary, and this will lead to use-after-free. You will find ASan report in `details`.
==24679==ERROR: AddressSanitizer: heap-use-after-free on address 0x604000615d20 at pc 0x000022cc8684 bp 0x7ffea6b5f850 sp 0x7ffea6b5f848 READ of size 8 at 0x604000615d20 thread T223 (QueryPipelineEx) 0 0x22cc8683 in DB::ReverseIndex<>::insert(StringRef const&) obj-x86_64-linux-gnu/../src/Columns/ReverseIndex.h 1 0x22cc0de1 in COW::mutable_ptr DB::ColumnUnique::uniqueInsertRangeImpl()::'lambda'()::operator()() const obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:540:39 2 0x22cc0de1 in COW::mutable_ptr DB::ColumnUnique::uniqueInsertRangeImpl() obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:570:23 3 0x22cb9c66 in COW::mutable_ptr DB::ColumnUnique::uniqueInsertRangeFrom()::'lambda'(auto)::operator()(auto) const obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:592:26 4 0x22cb9c66 in DB::ColumnUnique::uniqueInsertRangeFrom() obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:600:28 5 0x2500b897 in DB::ColumnLowCardinality::insertRangeFrom() obj-x86_64-linux-gnu/../src/Columns/ColumnLowCardinality.cpp:205:62 6 0x25a182f4 in DB::appendBlock(DB::Block const&, DB::Block&) obj-x86_64-linux-gnu/../src/Storages/StorageBuffer.cpp:470:23 7 0x25a182f4 in DB::BufferSink::insertIntoBuffer(DB::Block const&, DB::StorageBuffer::Buffer&) obj-x86_64-linux-gnu/../src/Storages/StorageBuffer.cpp:634:9 8 0x25a173cc in DB::BufferSink::consume(DB::Chunk) obj-x86_64-linux-gnu/../src/Storages/StorageBuffer.cpp:595:9 9 0x26d1c997 in DB::SinkToStorage::transform(DB::Chunk&) obj-x86_64-linux-gnu/../src/Processors/Sinks/SinkToStorage.cpp:18:5 0x604000615d20 is located 16 bytes inside of 40-byte region [0x604000615d10,0x604000615d38) freed by thread T37 (QueryPipelineEx) here: 2 0x22cb9392 in boost::intrusive_ptr >::~intrusive_ptr() obj-x86_64-linux-gnu/../contrib/boost/boost/smart_ptr/intrusive_ptr.hpp:98:23 4 0x22cb9392 in COW::mutable_ptr >::operator=() obj-x86_64-linux-gnu/../src/Common/COW.h:100:57 5 0x22cb9392 in DB::ReverseIndex<>::tryGetSavedHash() const obj-x86_64-linux-gnu/../src/Columns/ReverseIndex.h:362:28 6 0x22cb9392 in DB::ColumnUnique::tryGetSavedHash() const obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:125:76 7 0x242eaed3 in DB::ColumnsHashing::HashMethodSingleLowCardinalityColumn<>::HashMethodSingleLowCardinalityColumn() obj-x86_64-linux-gnu/../src/Common/ColumnsHashing.h:287:50 8 0x242206c6 in void DB::Aggregator::executeImpl<>() const obj-x86_64-linux-gnu/../src/Interpreters/Aggregator.cpp:596:28 9 0x24148e99 in DB::Aggregator::executeOnBlock() const obj-x86_64-linux-gnu/../src/Interpreters/Aggregator.cpp:1004:9 10 0x26c24f3a in DB::AggregatingTransform::consume(DB::Chunk) obj-x86_64-linux-gnu/../src/Processors/Transforms/AggregatingTransform.cpp:539:33 11 0x26c2054e in DB::AggregatingTransform::work() obj-x86_64-linux-gnu/../src/Processors/Transforms/AggregatingTransform.cpp:500:9 previously allocated by thread T37 (QueryPipelineEx) here: 0 0xb6d44fd in operator new(unsigned long) (/src/ch/tmp/upstream/clickhouse-asan+0xb6d44fd) 1 0x11b78580 in COW::mutable_ptr<> COWHelper<>::create(unsigned long&) (/src/ch/tmp/upstream/clickhouse-asan+0x11b78580) 2 0x22cbf7b1 in DB::ReverseIndex<>::calcHashes() const obj-x86_64-linux-gnu/../src/Columns/ReverseIndex.h:472:17 3 0x22cc2307 in DB::ReverseIndex<>::buildIndex() obj-x86_64-linux-gnu/../src/Columns/ReverseIndex.h:438:22 4 0x22cc658c in DB::ReverseIndex<>::insert(StringRef const&) obj-x86_64-linux-gnu/../src/Columns/ReverseIndex.h:484:9 5 0x22cc0de1 in COW::mutable_ptr DB::ColumnUnique::uniqueInsertRangeImpl()::'lambda'()::operator()() const obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:540:39 6 0x22cc0de1 in COW::mutable_ptr DB::ColumnUnique::uniqueInsertRangeImpl() obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:570:23 7 0x22cb9c66 in COW::mutable_ptr DB::ColumnUnique::uniqueInsertRangeFrom()::'lambda'(auto)::operator()(auto) const obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:592:26 8 0x22cb9c66 in DB::ColumnUnique::uniqueInsertRangeFrom() obj-x86_64-linux-gnu/../src/Columns/ColumnUnique.h:600:28 9 0x2500b897 in DB::ColumnLowCardinality::insertRangeFrom() obj-x86_64-linux-gnu/../src/Columns/ColumnLowCardinality.cpp:205:62 10 0x25a182f4 in DB::appendBlock(DB::Block const&, DB::Block&) obj-x86_64-linux-gnu/../src/Storages/StorageBuffer.cpp:470:23 11 0x25a182f4 in DB::BufferSink::insertIntoBuffer(DB::Block const&, DB::StorageBuffer::Buffer&) obj-x86_64-linux-gnu/../src/Storages/StorageBuffer.cpp:634:9 12 0x25a173cc in DB::BufferSink::consume(DB::Chunk) obj-x86_64-linux-gnu/../src/Storages/StorageBuffer.cpp:595:9 13 0x26d1c997 in DB::SinkToStorage::transform(DB::Chunk&) obj-x86_64-linux-gnu/../src/Processors/Sinks/SinkToStorage.cpp:18:5 SUMMARY: AddressSanitizer: heap-use-after-free obj-x86_64-linux-gnu/../src/Columns/ReverseIndex.h in DB::ReverseIndex::insert(StringRef const&) Shadow bytes around the buggy address: 0x0c08800bab50: fa fa fd fd fd fd fd fa fa fa 00 00 00 00 00 00 0x0c08800bab60: fa fa fd fd fd fd fd fa fa fa 00 00 00 00 00 fa 0x0c08800bab70: fa fa 00 00 00 00 00 00 fa fa 00 00 00 00 00 00 0x0c08800bab80: fa fa fd fd fd fd fd fa fa fa fd fd fd fd fd fa 0x0c08800bab90: fa fa fd fd fd fd fd fa fa fa fd fd fd fd fd fd =>0x0c08800baba0: fa fa fd fd[fd]fd fd fa fa fa fd fd fd fd fd fa 0x0c08800babb0: fa fa fd fd fd fd fd fd fa fa fd fd fd fd fd fa 0x0c08800babc0: fa fa fd fd fd fd fd fa fa fa fd fd fd fd fd fd 0x0c08800babd0: fa fa fd fd fd fd fd fd fa fa fd fd fd fd fd fa 0x0c08800babe0: fa fa fd fd fd fd fd fa fa fa fd fd fd fd fd fd 0x0c08800babf0: fa fa fd fd fd fd fd fd fa fa fd fd fd fd fd fd Shadow byte legend (one shadow byte represents 8 application bytes): Addressable: 00 Partially addressable: 01 02 03 04 05 06 07 Heap left redzone: fa Freed heap region: fd Stack left redzone: f1 Stack mid redzone: f2 Stack right redzone: f3 Stack after return: f5 Stack use after scope: f8 Global redzone: f9 Global init order: f6 Poisoned by user: f7 Container overflow: fc Array cookie: ac Intra object redzone: bb ASan internal: fe Left alloca redzone: ca Right alloca redzone: cb ==24679==ABORTING
--- src/Columns/ReverseIndex.h | 12 +++++----- ...ow_cardinality_parallel_group_by.reference | 0 ...02046_low_cardinality_parallel_group_by.sh | 22 +++++++++++++++++++ 3 files changed, 29 insertions(+), 5 deletions(-) create mode 100644 tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.reference create mode 100755 tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.sh diff --git a/src/Columns/ReverseIndex.h b/src/Columns/ReverseIndex.h index 9f13dbef1f9..1f49e2cb225 100644 --- a/src/Columns/ReverseIndex.h +++ b/src/Columns/ReverseIndex.h @@ -317,7 +317,7 @@ class ReverseIndex { public: ReverseIndex(UInt64 num_prefix_rows_to_skip_, UInt64 base_index_) - : num_prefix_rows_to_skip(num_prefix_rows_to_skip_), base_index(base_index_), saved_hash_ptr(nullptr) {} + : num_prefix_rows_to_skip(num_prefix_rows_to_skip_), base_index(base_index_), external_saved_hash_ptr(nullptr) {} void setColumn(ColumnType * column_); @@ -352,14 +352,14 @@ public: if (!use_saved_hash) return nullptr; - UInt64 * ptr = saved_hash_ptr.load(); + UInt64 * ptr = external_saved_hash_ptr.load(); if (!ptr) { auto hash = calcHashes(); ptr = &hash->getData()[0]; UInt64 * expected = nullptr; - if (saved_hash_ptr.compare_exchange_strong(expected, ptr)) - saved_hash = std::move(hash); + if (external_saved_hash_ptr.compare_exchange_strong(expected, ptr)) + external_saved_hash = std::move(hash); else ptr = expected; } @@ -379,7 +379,9 @@ private: /// Lazy initialized. std::unique_ptr index; mutable ColumnUInt64::MutablePtr saved_hash; - mutable std::atomic saved_hash_ptr; + /// For usage during GROUP BY + mutable ColumnUInt64::MutablePtr external_saved_hash; + mutable std::atomic external_saved_hash_ptr; void buildIndex(); diff --git a/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.reference b/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.sh b/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.sh new file mode 100755 index 00000000000..ec58f36cede --- /dev/null +++ b/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# This is the regression test for parallel usage of LowCardinality column +# via Buffer engine. +# +# See also: +# - https://github.com/ClickHouse/ClickHouse/issues/24158 +# - https://github.com/ClickHouse/ClickHouse/pull/3138 + +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS low_card_buffer_test" +$CLICKHOUSE_CLIENT -q "CREATE TABLE low_card_buffer_test (test_text LowCardinality(String)) ENGINE=Buffer('', '', 16, 60, 360, 100, 1000, 10000, 100000)" + +$CLICKHOUSE_BENCHMARK -d 0 -i 1000 -c 5 <<<"SELECT count() FROM low_card_buffer_test GROUP BY test_text format Null" 2>/dev/null & +$CLICKHOUSE_BENCHMARK -d 0 -i 1000 -c 2 <<<"INSERT INTO low_card_buffer_test values('TEST1')" 2>/dev/null & +wait + +# server is alive +$CLICKHOUSE_CLIENT -q "SELECT 1 FORMAT Null" From 621c9884fc414c13fb0283e93b289ff333af9a40 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 6 Oct 2021 12:39:54 +0300 Subject: [PATCH 7/8] Account offset in readBig call --- src/Compression/CompressedReadBufferFromFile.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Compression/CompressedReadBufferFromFile.cpp b/src/Compression/CompressedReadBufferFromFile.cpp index 12f821890fb..2cfd6d65c1c 100644 --- a/src/Compression/CompressedReadBufferFromFile.cpp +++ b/src/Compression/CompressedReadBufferFromFile.cpp @@ -123,9 +123,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n) auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer(); - /// If the decompressed block fits entirely where it needs to be copied. - if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read) + /// If the decompressed block fits entirely where it needs to be copied and we don't + /// need to skip some bytes in decompressed data (seek happened before readBig call). + if (nextimpl_working_buffer_offset == 0 && size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read) { + decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum); bytes_read += size_decompressed; bytes += size_decompressed; From a62728c16e6a4f81d411e455b6026820c1ae7a55 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 6 Oct 2021 21:11:59 +0300 Subject: [PATCH 8/8] Mark 02046_low_cardinality_parallel_group_by as long --- .../0_stateless/02046_low_cardinality_parallel_group_by.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.sh b/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.sh index ec58f36cede..c2ae622e6a8 100755 --- a/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.sh +++ b/tests/queries/0_stateless/02046_low_cardinality_parallel_group_by.sh @@ -1,4 +1,5 @@ #!/usr/bin/env bash +# Tags: long CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh