Open file for reading lazily in CachedCompressedReadBuffer.

This commit is contained in:
Pavel Kovalenko 2020-03-05 17:01:19 +03:00 committed by Pavel Kovalenko
parent 98cd70ced3
commit 96bb607e66
4 changed files with 27 additions and 35 deletions

View File

@ -1,10 +1,10 @@
#include "CachedCompressedReadBuffer.h" #include "CachedCompressedReadBuffer.h"
#include <IO/createReadBufferFromFileBase.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Compression/CompressionInfo.h>
#include <Compression/LZ4_decompress_faster.h> #include <Compression/LZ4_decompress_faster.h>
#include <utility>
namespace DB namespace DB
{ {
@ -19,7 +19,7 @@ void CachedCompressedReadBuffer::initInput()
{ {
if (!file_in) if (!file_in)
{ {
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size); file_in = file_in_creator();
compressed_in = file_in.get(); compressed_in = file_in.get();
if (profile_callback) if (profile_callback)
@ -71,24 +71,12 @@ bool CachedCompressedReadBuffer::nextImpl()
return true; return true;
} }
CachedCompressedReadBuffer::CachedCompressedReadBuffer(std::unique_ptr<ReadBufferFromFileBase> file_in_, UncompressedCache * cache_)
: ReadBuffer(nullptr, 0), cache(cache_), file_in(std::move(file_in_)), path(file_in->getFileName()), file_pos(0)
{
compressed_in = file_in.get();
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer( CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_)
size_t estimated_size_, size_t aio_threshold_, size_t mmap_threshold_, : ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0)
size_t buf_size_)
: ReadBuffer(nullptr, 0), cache(cache_), path(path_), buf_size(buf_size_), estimated_size(estimated_size_),
aio_threshold(aio_threshold_), mmap_threshold(mmap_threshold_), file_pos(0)
{ {
} }
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{ {
if (owned_cell && if (owned_cell &&

View File

@ -20,15 +20,11 @@ namespace DB
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
{ {
private: private:
std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator;
UncompressedCache * cache; UncompressedCache * cache;
std::unique_ptr<ReadBufferFromFileBase> file_in; std::unique_ptr<ReadBufferFromFileBase> file_in;
const std::string path; const std::string path;
size_t buf_size {};
size_t estimated_size {};
size_t aio_threshold {};
size_t mmap_threshold {};
size_t file_pos; size_t file_pos;
/// A piece of data from the cache, or a piece of read data that we put into the cache. /// A piece of data from the cache, or a piece of read data that we put into the cache.
@ -42,13 +38,7 @@ private:
clockid_t clock_type {}; clockid_t clock_type {};
public: public:
CachedCompressedReadBuffer(std::unique_ptr<ReadBufferFromFileBase> file_in, UncompressedCache * cache_); CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_);
CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_,
size_t estimated_size_, size_t aio_threshold_, size_t mmap_threshold_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -39,10 +39,17 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
if (uncompressed_cache) if (uncompressed_cache)
{ {
auto buffer = auto buffer = std::make_unique<CachedCompressedReadBuffer>(
std::make_unique<CachedCompressedReadBuffer>( full_data_path,
data_part->disk->readFile(full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, 0), [&]() {
uncompressed_cache); return data_part->disk->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
0);
},
uncompressed_cache);
if (profile_callback_) if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_); buffer->setProfileCallback(profile_callback_, clock_type_);

View File

@ -79,8 +79,15 @@ MergeTreeReaderStream::MergeTreeReaderStream(
if (uncompressed_cache) if (uncompressed_cache)
{ {
auto buffer = std::make_unique<CachedCompressedReadBuffer>( auto buffer = std::make_unique<CachedCompressedReadBuffer>(
disk->readFile(path_prefix + data_file_extension, buffer_size, path_prefix + data_file_extension,
sum_mark_range_bytes, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io), [&]() {
return disk->readFile(
path_prefix + data_file_extension,
buffer_size,
sum_mark_range_bytes,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
},
uncompressed_cache); uncompressed_cache);
if (profile_callback) if (profile_callback)