ClickHouse/dbms/src/IO/CachedCompressedReadBuffer.cpp

111 lines
3.4 KiB
C++
Raw Normal View History

2018-09-04 19:34:34 +00:00
#include <IO/createReadBufferFromFileBase.h>
#include <IO/CachedCompressedReadBuffer.h>
#include <IO/WriteHelpers.h>
2018-01-15 05:54:28 +00:00
#include <IO/CompressedStream.h>
2018-01-16 01:59:51 +00:00
#include <IO/LZ4_decompress_faster.h>
2018-10-11 02:57:48 +00:00
#include "CachedCompressedReadBuffer.h"
2016-10-25 06:49:24 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int SEEK_POSITION_OUT_OF_BOUND;
2016-10-25 06:49:24 +00:00
}
void CachedCompressedReadBuffer::initInput()
{
if (!file_in)
{
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size);
2018-12-19 17:20:18 +00:00
in = liftCompressed(codec, *file_in);
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
}
2016-10-25 06:49:24 +00:00
}
bool CachedCompressedReadBuffer::nextImpl()
{
/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
UInt128 key = cache->hash(path, file_pos);
owned_cell = cache->get(key);
2016-10-25 06:49:24 +00:00
if (!owned_cell)
{
/// If not, read it from the file.
initInput();
file_in->seek(file_pos);
2016-10-25 06:49:24 +00:00
owned_cell = std::make_shared<UncompressedCacheCell>();
2016-10-25 06:49:24 +00:00
2018-12-19 17:20:18 +00:00
UInt32 size_decompressed;
std::tie(owned_cell->compressed_size, size_decompressed) = in->readCompressedData();
2016-10-25 06:49:24 +00:00
if (owned_cell->compressed_size)
{
2018-01-16 01:59:51 +00:00
owned_cell->data.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
2018-12-19 17:20:18 +00:00
in->decompress(owned_cell->data.data(), owned_cell->compressed_size);
2018-10-11 02:57:48 +00:00
in->buffer() = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
2018-12-19 17:20:18 +00:00
in->decompress(owned_cell->data.data(), owned_cell->compressed_size);
2016-10-25 06:49:24 +00:00
/// Put data into cache.
cache->set(key, owned_cell);
}
}
2016-10-25 06:49:24 +00:00
2018-06-12 03:25:54 +00:00
if (owned_cell->data.size() == 0)
{
owned_cell = nullptr;
return false;
}
2016-10-25 06:49:24 +00:00
2018-06-12 03:25:54 +00:00
working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
2016-10-25 06:49:24 +00:00
file_pos += owned_cell->compressed_size;
2016-10-25 06:49:24 +00:00
return true;
2016-10-25 06:49:24 +00:00
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
2018-12-19 17:20:18 +00:00
const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec_,
2018-10-11 02:57:48 +00:00
size_t estimated_size_, size_t aio_threshold_, size_t buf_size_)
2018-12-19 17:20:18 +00:00
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), codec(codec_), buf_size(buf_size_), estimated_size(estimated_size_),
aio_threshold(aio_threshold_), file_pos(0)
2016-10-25 06:49:24 +00:00
{
}
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (owned_cell &&
offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
else
{
file_pos = offset_in_compressed_file;
bytes += offset();
nextImpl();
if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
2016-10-25 06:49:24 +00:00
}
}