From 96bb607e6693398cee0bc85c57979919c1d80037 Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Thu, 5 Mar 2020 17:01:19 +0300 Subject: [PATCH] Open file for reading lazily in CachedCompressedReadBuffer. --- .../CachedCompressedReadBuffer.cpp | 22 +++++-------------- .../Compression/CachedCompressedReadBuffer.h | 14 ++---------- .../MergeTree/MergeTreeReaderCompact.cpp | 15 +++++++++---- .../MergeTree/MergeTreeReaderStream.cpp | 11 ++++++++-- 4 files changed, 27 insertions(+), 35 deletions(-) diff --git a/dbms/src/Compression/CachedCompressedReadBuffer.cpp b/dbms/src/Compression/CachedCompressedReadBuffer.cpp index 00cb5fe3fe9..1b083c004c0 100644 --- a/dbms/src/Compression/CachedCompressedReadBuffer.cpp +++ b/dbms/src/Compression/CachedCompressedReadBuffer.cpp @@ -1,10 +1,10 @@ #include "CachedCompressedReadBuffer.h" -#include #include -#include #include +#include + namespace DB { @@ -19,7 +19,7 @@ void CachedCompressedReadBuffer::initInput() { if (!file_in) { - file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size); + file_in = file_in_creator(); compressed_in = file_in.get(); if (profile_callback) @@ -71,24 +71,12 @@ bool CachedCompressedReadBuffer::nextImpl() return true; } - -CachedCompressedReadBuffer::CachedCompressedReadBuffer(std::unique_ptr file_in_, UncompressedCache * cache_) - : ReadBuffer(nullptr, 0), cache(cache_), file_in(std::move(file_in_)), path(file_in->getFileName()), file_pos(0) -{ - compressed_in = file_in.get(); -} - - CachedCompressedReadBuffer::CachedCompressedReadBuffer( - const std::string & path_, UncompressedCache * cache_, - size_t estimated_size_, size_t aio_threshold_, size_t mmap_threshold_, - size_t buf_size_) - : ReadBuffer(nullptr, 0), cache(cache_), path(path_), buf_size(buf_size_), estimated_size(estimated_size_), - aio_threshold(aio_threshold_), mmap_threshold(mmap_threshold_), file_pos(0) + const std::string & path_, std::function()> file_in_creator_, UncompressedCache * cache_) + : ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0) { } - void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { if (owned_cell && diff --git a/dbms/src/Compression/CachedCompressedReadBuffer.h b/dbms/src/Compression/CachedCompressedReadBuffer.h index 4f9f1fce480..88bcec8197d 100644 --- a/dbms/src/Compression/CachedCompressedReadBuffer.h +++ b/dbms/src/Compression/CachedCompressedReadBuffer.h @@ -20,15 +20,11 @@ namespace DB class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer { private: + std::function()> file_in_creator; UncompressedCache * cache; std::unique_ptr file_in; const std::string path; - size_t buf_size {}; - size_t estimated_size {}; - size_t aio_threshold {}; - size_t mmap_threshold {}; - size_t file_pos; /// A piece of data from the cache, or a piece of read data that we put into the cache. @@ -42,13 +38,7 @@ private: clockid_t clock_type {}; public: - CachedCompressedReadBuffer(std::unique_ptr file_in, UncompressedCache * cache_); - - CachedCompressedReadBuffer( - const std::string & path_, UncompressedCache * cache_, - size_t estimated_size_, size_t aio_threshold_, size_t mmap_threshold_, - size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); - + CachedCompressedReadBuffer(const std::string & path, std::function()> file_in_creator, UncompressedCache * cache_); void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 8698ba78676..2b05cbf28d5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -39,10 +39,17 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( if (uncompressed_cache) { - auto buffer = - std::make_unique( - data_part->disk->readFile(full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, 0), - uncompressed_cache); + auto buffer = std::make_unique( + full_data_path, + [&]() { + return data_part->disk->readFile( + full_data_path, + buffer_size, + 0, + settings.min_bytes_to_use_direct_io, + 0); + }, + uncompressed_cache); if (profile_callback_) buffer->setProfileCallback(profile_callback_, clock_type_); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp index d0e73cd16e0..393e753a4ba 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp @@ -79,8 +79,15 @@ MergeTreeReaderStream::MergeTreeReaderStream( if (uncompressed_cache) { auto buffer = std::make_unique( - disk->readFile(path_prefix + data_file_extension, buffer_size, - sum_mark_range_bytes, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io), + path_prefix + data_file_extension, + [&]() { + return disk->readFile( + path_prefix + data_file_extension, + buffer_size, + sum_mark_range_bytes, + settings.min_bytes_to_use_direct_io, + settings.min_bytes_to_use_mmap_io); + }, uncompressed_cache); if (profile_callback)