diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index 1b083c004c0..beb13d15f01 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -12,6 +12,7 @@ namespace DB namespace ErrorCodes { extern const int SEEK_POSITION_OUT_OF_BOUND; + extern const int LOGICAL_ERROR; } @@ -19,8 +20,9 @@ void CachedCompressedReadBuffer::initInput() { if (!file_in) { - file_in = file_in_creator(); - compressed_in = file_in.get(); + file_in_holder = file_in_creator(); + file_in = file_in_holder.get(); + compressed_in = file_in; if (profile_callback) file_in->setProfileCallback(profile_callback, clock_type); @@ -71,6 +73,14 @@ bool CachedCompressedReadBuffer::nextImpl() return true; } +CachedCompressedReadBuffer::CachedCompressedReadBuffer( + const std::string & path_, ReadBufferFromFileBase * file_in_, UncompressedCache * cache_) + : ReadBuffer(nullptr, 0), file_in(file_in_), cache(cache_), path(path_), file_pos(0) +{ + if (file_in == nullptr) + throw Exception("Neither file_in nor file_in_creator is initialized in CachedCompressedReadBuffer", ErrorCodes::LOGICAL_ERROR); +} + CachedCompressedReadBuffer::CachedCompressedReadBuffer( const std::string & path_, std::function()> file_in_creator_, UncompressedCache * cache_) : ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0) diff --git a/src/Compression/CachedCompressedReadBuffer.h b/src/Compression/CachedCompressedReadBuffer.h index 88bcec8197d..2c5aa4920bd 100644 --- a/src/Compression/CachedCompressedReadBuffer.h +++ b/src/Compression/CachedCompressedReadBuffer.h @@ -22,7 +22,8 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB private: std::function()> file_in_creator; UncompressedCache * cache; - std::unique_ptr file_in; + std::unique_ptr file_in_holder; + ReadBufferFromFileBase * file_in; const std::string path; size_t file_pos; @@ -38,6 +39,7 @@ private: clockid_t clock_type {}; public: + CachedCompressedReadBuffer(const std::string & path_, ReadBufferFromFileBase * file_in_, UncompressedCache * cache_); CachedCompressedReadBuffer(const std::string & path, std::function()> file_in_creator, UncompressedCache * cache_); void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); diff --git a/src/Compression/CompressedReadBufferFromFile.cpp b/src/Compression/CompressedReadBufferFromFile.cpp index ddd8bba686f..2927ee1b399 100644 --- a/src/Compression/CompressedReadBufferFromFile.cpp +++ b/src/Compression/CompressedReadBufferFromFile.cpp @@ -37,6 +37,12 @@ bool CompressedReadBufferFromFile::nextImpl() return true; } +CompressedReadBufferFromFile::CompressedReadBufferFromFile(ReadBufferFromFileBase & file_in_) + : BufferWithOwnMemory(0), file_in(file_in_) +{ + compressed_in = &file_in; +} + CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr buf) : BufferWithOwnMemory(0), p_file_in(std::move(buf)), file_in(*p_file_in) { diff --git a/src/Compression/CompressedReadBufferFromFile.h b/src/Compression/CompressedReadBufferFromFile.h index 1729490f606..1de28062e41 100644 --- a/src/Compression/CompressedReadBufferFromFile.h +++ b/src/Compression/CompressedReadBufferFromFile.h @@ -28,6 +28,7 @@ private: bool nextImpl() override; public: + CompressedReadBufferFromFile(ReadBufferFromFileBase & buf); CompressedReadBufferFromFile(std::unique_ptr buf); CompressedReadBufferFromFile( diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 4357ee66a6e..920f171d7f9 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -45,40 +45,31 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( { size_t buffer_size = settings.max_read_buffer_size; const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION; + file_in = data_part->volume->getDisk()->readFile( + full_data_path, buffer_size, 0, + settings.min_bytes_to_use_direct_io, + settings.min_bytes_to_use_mmap_io); - if (uncompressed_cache) + auto full_path = fullPath(data_part->volume->getDisk(), full_data_path); + for (const auto & column : columns) { - auto buffer = std::make_unique( - fullPath(data_part->volume->getDisk(), full_data_path), - [this, full_data_path, buffer_size]() - { - return data_part->volume->getDisk()->readFile( - full_data_path, - buffer_size, - 0, - settings.min_bytes_to_use_direct_io, - settings.min_bytes_to_use_mmap_io); - }, - uncompressed_cache); + + std::unique_ptr cached_buffer; + std::unique_ptr non_cached_buffer; + if (uncompressed_cache) + { + cached_buffer = std::make_unique(full_path, file_in.get(), uncompressed_cache); + if (profile_callback_) + cached_buffer->setProfileCallback(profile_callback_, clock_type_); + } + else + { + non_cached_buffer = std::make_unique(*file_in); + if (profile_callback_) + non_cached_buffer->setProfileCallback(profile_callback_, clock_type_); + } - if (profile_callback_) - buffer->setProfileCallback(profile_callback_, clock_type_); - - cached_buffer = std::move(buffer); - data_buffer = cached_buffer.get(); - } - else - { - auto buffer = - std::make_unique( - data_part->volume->getDisk()->readFile( - full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io)); - - if (profile_callback_) - buffer->setProfileCallback(profile_callback_, clock_type_); - - non_cached_buffer = std::move(buffer); - data_buffer = non_cached_buffer.get(); + column_streams[column.name] = ColumnStream{std::move(cached_buffer), std::move(non_cached_buffer)}; } size_t columns_num = columns.size(); @@ -181,15 +172,16 @@ void MergeTreeReaderCompact::readData( const String & name, IColumn & column, const IDataType & type, size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets) { + auto & stream = column_streams[name]; if (!isContinuousReading(from_mark, column_position)) - seekToMark(from_mark, column_position); + seekToMark(stream, from_mark, column_position); auto buffer_getter = [&](const IDataType::SubstreamPath & substream_path) -> ReadBuffer * { if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != IDataType::Substream::ArraySizes)) return nullptr; - return data_buffer; + return stream.data_buffer; }; IDataType::DeserializeBinaryBulkSettings deserialize_settings; @@ -209,15 +201,15 @@ void MergeTreeReaderCompact::readData( } -void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index) +void MergeTreeReaderCompact::seekToMark(ColumnStream & stream, size_t row_index, size_t column_index) { MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index); try { - if (cached_buffer) - cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); - if (non_cached_buffer) - non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); + if (stream.cached_buffer) + stream.cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); + if (stream.non_cached_buffer) + stream.non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); } catch (Exception & e) { @@ -239,4 +231,16 @@ bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_posi || (mark == last_mark + 1 && column_position == 0 && last_column == data_part->getColumns().size() - 1); } +MergeTreeReaderCompact::ColumnStream::ColumnStream( + std::unique_ptr cached_buffer_, + std::unique_ptr non_cached_buffer_) + : cached_buffer(std::move(cached_buffer_)) + , non_cached_buffer(std::move(non_cached_buffer_)) +{ + if (cached_buffer) + data_buffer = cached_buffer.get(); + else + data_buffer = non_cached_buffer.get(); +} + } diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.h b/src/Storages/MergeTree/MergeTreeReaderCompact.h index 0457b4b6a50..41682f8b0bd 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -35,9 +36,21 @@ public: private: bool isContinuousReading(size_t mark, size_t column_position); - ReadBuffer * data_buffer; - std::unique_ptr cached_buffer; - std::unique_ptr non_cached_buffer; + std::unique_ptr file_in; + + struct ColumnStream + { + std::unique_ptr cached_buffer; + std::unique_ptr non_cached_buffer; + ReadBuffer * data_buffer; + + ColumnStream() = default; + ColumnStream( + std::unique_ptr cached_buffer_, + std::unique_ptr non_cached_buffer_); + }; + + std::unordered_map column_streams; MergeTreeMarksLoader marks_loader; @@ -49,7 +62,7 @@ private: size_t next_mark = 0; std::optional> last_read_granule; - void seekToMark(size_t row_index, size_t column_index); + void seekToMark(ColumnStream & stream, size_t row_index, size_t column_index); void readData(const String & name, IColumn & column, const IDataType & type, size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false);