From 8186339307e0e5242439ab2286bd05fae1b489f5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 5 Oct 2021 12:11:25 +0300 Subject: [PATCH] Less seeks in compressed buffers --- .../CachedCompressedReadBuffer.cpp | 37 +++++++++------ src/Compression/CachedCompressedReadBuffer.h | 10 ++++- .../CompressedReadBufferFromFile.cpp | 44 +++++++++++------- .../CompressedReadBufferFromFile.h | 16 +++++++ .../MergeTree/MergeTreeReaderWide.cpp | 45 ++++++++++++++----- src/Storages/MergeTree/MergeTreeReaderWide.h | 7 +-- 6 files changed, 114 insertions(+), 45 deletions(-) diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index d511266d139..bb0a17b730f 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -2,6 +2,7 @@ #include #include +#include #include @@ -30,7 +31,6 @@ void CachedCompressedReadBuffer::initInput() bool CachedCompressedReadBuffer::nextImpl() { - /// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists. UInt128 key = cache->hash(path, file_pos); @@ -60,6 +60,13 @@ bool CachedCompressedReadBuffer::nextImpl() working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes); + /// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to + /// check that we are not seeking beyond working buffer. + if (nextimpl_working_buffer_offset > working_buffer.size()) + throw Exception("Seek position is beyond the decompressed block" + " (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")", + ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); + file_pos += owned_cell->compressed_size; return true; @@ -74,28 +81,32 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer( void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { + + /// Nothing to do if we already at required position + if (file_pos == offset_in_compressed_file + && (offset() == offset_in_decompressed_block || + nextimpl_working_buffer_offset == offset_in_decompressed_block)) + return; + if (owned_cell && offset_in_compressed_file == file_pos - owned_cell->compressed_size && offset_in_decompressed_block <= working_buffer.size()) { - bytes += offset(); pos = working_buffer.begin() + offset_in_decompressed_block; - bytes -= offset(); } else { + + LOG_INFO(&Poco::Logger::get("DEBUG"), "Adding offset {}", offset()); + /// Remember position in compressed file (will be moved in nextImpl) file_pos = offset_in_compressed_file; - + /// We will discard our working_buffer, but have to account rest bytes bytes += offset(); - nextImpl(); - - if (offset_in_decompressed_block > working_buffer.size()) - throw Exception("Seek position is beyond the decompressed block" - " (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")", - ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); - - pos = working_buffer.begin() + offset_in_decompressed_block; - bytes -= offset(); + /// No data, everything discarded + pos = working_buffer.end(); + /// Remember required offset in decompressed block which will be set in + /// the next ReadBuffer::next() call + nextimpl_working_buffer_offset = offset_in_decompressed_block; } } diff --git a/src/Compression/CachedCompressedReadBuffer.h b/src/Compression/CachedCompressedReadBuffer.h index c2338f6f841..bb24f699eed 100644 --- a/src/Compression/CachedCompressedReadBuffer.h +++ b/src/Compression/CachedCompressedReadBuffer.h @@ -15,7 +15,7 @@ namespace DB * The external cache is passed as an argument to the constructor. * Allows you to increase performance in cases where the same blocks are often read. * Disadvantages: - * - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and. + * - in case you need to read a lot of data in a row, but some of them only a part is cached, you have to do seek-and. */ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer { @@ -25,6 +25,8 @@ private: std::unique_ptr file_in; const std::string path; + + /// Current position in file_in size_t file_pos; /// A piece of data from the cache, or a piece of read data that we put into the cache. @@ -37,9 +39,15 @@ private: ReadBufferFromFileBase::ProfileCallback profile_callback; clockid_t clock_type {}; + + /// Check comment in CompressedReadBuffer + /* size_t nextimpl_working_buffer_offset; */ + public: CachedCompressedReadBuffer(const std::string & path, std::function()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false); + /// Seek is lazy. It doesn't move the position anywhere, just remember them and perform actual + /// seek inside nextImpl. void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) diff --git a/src/Compression/CompressedReadBufferFromFile.cpp b/src/Compression/CompressedReadBufferFromFile.cpp index 934ce33a056..5760d1d638f 100644 --- a/src/Compression/CompressedReadBufferFromFile.cpp +++ b/src/Compression/CompressedReadBufferFromFile.cpp @@ -33,6 +33,13 @@ bool CompressedReadBufferFromFile::nextImpl() decompress(working_buffer, size_decompressed, size_compressed_without_checksum); + /// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to + /// check that we are not seeking beyond working buffer. + if (nextimpl_working_buffer_offset > working_buffer.size()) + throw Exception("Required to move position beyond the decompressed block" + " (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")", + ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); + return true; } @@ -67,33 +74,34 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile( void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { + /// Nothing to do if we already at required position + if (file_in.getPosition() - size_compressed == offset_in_compressed_file && /// correct position in compressed file + (offset() == offset_in_decompressed_block /// correct position in buffer or + || nextimpl_working_buffer_offset == offset_in_decompressed_block)) /// we will move our position to correct one + return; + + /// Our seek is within working_buffer, so just move the position if (size_compressed && offset_in_compressed_file == file_in.getPosition() - size_compressed && offset_in_decompressed_block <= working_buffer.size()) { - bytes += offset(); pos = working_buffer.begin() + offset_in_decompressed_block; - /// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right. - bytes -= offset(); } - else + else /// Our seek outside working buffer, so perform "lazy seek" { + /// Actually seek compressed file file_in.seek(offset_in_compressed_file, SEEK_SET); - + /// We will discard our working_buffer, but have to account rest bytes bytes += offset(); - nextImpl(); - - if (offset_in_decompressed_block > working_buffer.size()) - throw Exception("Seek position is beyond the decompressed block" - " (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")", - ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); - - pos = working_buffer.begin() + offset_in_decompressed_block; - bytes -= offset(); + /// No data, everything discarded + pos = working_buffer.end(); + size_compressed = 0; + /// Remember required offset in decompressed block which will be set in + /// the next ReadBuffer::next() call + nextimpl_working_buffer_offset = offset_in_decompressed_block; } } - size_t CompressedReadBufferFromFile::readBig(char * to, size_t n) { size_t bytes_read = 0; @@ -134,7 +142,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n) working_buffer = Buffer(memory.data(), &memory[size_decompressed]); decompress(working_buffer, size_decompressed, size_compressed_without_checksum); - pos = working_buffer.begin(); + + /// Manually take nextimpl_working_buffer_offset into account, because we don't use + /// nextImpl in this method. + pos = working_buffer.begin() + nextimpl_working_buffer_offset; + nextimpl_working_buffer_offset = 0; bytes_read += read(to + bytes_read, n - bytes_read); break; diff --git a/src/Compression/CompressedReadBufferFromFile.h b/src/Compression/CompressedReadBufferFromFile.h index fcca48699b3..5f027851da3 100644 --- a/src/Compression/CompressedReadBufferFromFile.h +++ b/src/Compression/CompressedReadBufferFromFile.h @@ -28,6 +28,19 @@ private: ReadBufferFromFileBase & file_in; size_t size_compressed = 0; + /// This field inherited from ReadBuffer. It's used to perform "lazy" seek, so in seek() call we: + /// 1) actually seek only underlying compressed file_in to offset_in_compressed_file; + /// 2) reset current working_buffer; + /// 3) remember the position in decompressed block in nextimpl_working_buffer_offset. + /// After following ReadBuffer::next() -> nextImpl call we will read new data into working_buffer and + /// ReadBuffer::next() will move our position in the fresh working_buffer to nextimpl_working_buffer_offset and + /// reset it to zero. + /// + /// NOTE: We have independent readBig implementation, so we have to take + /// nextimpl_working_buffer_offset into account there as well. + /// + /* size_t nextimpl_working_buffer_offset; */ + bool nextImpl() override; void prefetch() override; @@ -37,6 +50,9 @@ public: CompressedReadBufferFromFile( const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false); + /// Seek is lazy in some sense. We move position in compressed file_in to offset_in_compressed_file, but don't + /// read data into working_buffer and don't shit our position to offset_in_decompressed_block. Instead + /// we store this offset inside nextimpl_working_buffer_offset. void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); size_t readBig(char * to, size_t n) override; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 2545120006d..9352eaff370 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -75,6 +75,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si OffsetColumns offset_columns; std::unordered_map caches; + std::unordered_set prefetched_streams; if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch) { /// Request reading of data in advance, @@ -86,7 +87,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si try { auto & cache = caches[column_from_part.getNameInStorage()]; - prefetch(column_from_part, from_mark, continue_reading, cache); + prefetch(column_from_part, from_mark, continue_reading, cache, prefetched_streams); } catch (Exception & e) { @@ -98,6 +99,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si } auto name_and_type = columns.begin(); + for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) { auto column_from_part = getColumnFromPart(*name_and_type); @@ -114,7 +116,9 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si size_t column_size_before_reading = column->size(); auto & cache = caches[column_from_part.getNameInStorage()]; - readData(column_from_part, column, from_mark, continue_reading, max_rows_to_read, cache); + readData( + column_from_part, column, from_mark, continue_reading, + max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty()); /// For elements of Nested, column_size_before_reading may be greater than column size /// if offsets are not empty and were already read, but elements are empty. @@ -190,11 +194,11 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type, static ReadBuffer * getStream( - bool stream_for_prefix, + bool seek_to_start, const ISerialization::SubstreamPath & substream_path, MergeTreeReaderWide::FileStreams & streams, const NameAndTypePair & name_and_type, - size_t from_mark, bool continue_reading, + size_t from_mark, bool seek_to_mark, ISerialization::SubstreamsCache & cache) { /// If substream have already been read. @@ -209,9 +213,9 @@ static ReadBuffer * getStream( MergeTreeReaderStream & stream = *it->second; - if (stream_for_prefix) + if (seek_to_start) stream.seekToStart(); - else if (!continue_reading) + else if (seek_to_mark) stream.seekToMark(from_mark); return stream.data_buffer; @@ -222,15 +226,25 @@ void MergeTreeReaderWide::prefetch( const NameAndTypePair & name_and_type, size_t from_mark, bool continue_reading, - ISerialization::SubstreamsCache & cache) + ISerialization::SubstreamsCache & cache, + std::unordered_set & prefetched_streams) { const auto & name = name_and_type.name; auto & serialization = serializations[name]; serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path) { - if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache)) - buf->prefetch(); + bool seek_to_start = deserialize_binary_bulk_state_map.count(name) == 0; + String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path); + + if (!prefetched_streams.count(stream_name)) + { + bool seek_to_mark = !continue_reading; + if (ReadBuffer * buf = getStream(seek_to_start, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache)) + buf->prefetch(); + + prefetched_streams.insert(stream_name); + } }); } @@ -238,7 +252,7 @@ void MergeTreeReaderWide::prefetch( void MergeTreeReaderWide::readData( const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark, bool continue_reading, size_t max_rows_to_read, - ISerialization::SubstreamsCache & cache) + ISerialization::SubstreamsCache & cache, bool was_prefetched) { double & avg_value_size_hint = avg_value_size_hints[name_and_type.name]; ISerialization::DeserializeBinaryBulkSettings deserialize_settings; @@ -251,14 +265,21 @@ void MergeTreeReaderWide::readData( { deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path) { - return getStream(true, substream_path, streams, name_and_type, from_mark, continue_reading, cache); + /// If data was already prefetched we don't need to seek to start + bool seek_to_start = !was_prefetched; + bool seek_to_mark = !was_prefetched && !continue_reading; + return getStream(seek_to_start, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache); }; serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]); } deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path) { - return getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache); + bool seek_to_mark = !was_prefetched && !continue_reading; + + return getStream( + /* seek_to_start = */false, substream_path, streams, name_and_type, from_mark, + seek_to_mark, cache); }; deserialize_settings.continuous_reading = continue_reading; auto & deserialize_state = deserialize_binary_bulk_state_map[name]; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.h b/src/Storages/MergeTree/MergeTreeReaderWide.h index d44732c007d..08d743370a9 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.h +++ b/src/Storages/MergeTree/MergeTreeReaderWide.h @@ -46,14 +46,15 @@ private: void readData( const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark, bool continue_reading, size_t max_rows_to_read, - ISerialization::SubstreamsCache & cache); + ISerialization::SubstreamsCache & cache, bool was_prefetched); - /// Make next readData more simple by calling 'prefetch' of all related ReadBuffers. + /// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams). void prefetch( const NameAndTypePair & name_and_type, size_t from_mark, bool continue_reading, - ISerialization::SubstreamsCache & cache); + ISerialization::SubstreamsCache & cache, + std::unordered_set & prefetched_streams); /// if stream was already prefetched do nothing }; }