ClickHouse/dbms/include/DB/IO/CachedCompressedReadBuffer.h

142 lines
4.1 KiB
C++
Raw Normal View History

#pragma once
#include <memory>
#include <DB/IO/createReadBufferFromFileBase.h>
#include <DB/IO/CompressedReadBufferBase.h>
#include <DB/IO/UncompressedCache.h>
namespace DB
{
2016-01-12 02:42:18 +00:00
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
/** Буфер для чтения из сжатого файла с использованием кэша разжатых блоков.
* Кэш внешний - передаётся в качестве аргумента в конструктор.
* Позволяет увеличить производительность в случае, когда часто читаются одни и те же блоки.
* Недостатки:
* - в случае, если нужно читать много данных подряд, но из них только часть закэширована, приходится делать seek-и.
*/
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
{
private:
const std::string path;
2013-11-26 11:55:11 +00:00
UncompressedCache * cache;
size_t buf_size;
2015-04-13 15:02:39 +00:00
size_t estimated_size;
size_t aio_threshold;
std::unique_ptr<ReadBufferFromFileBase> file_in;
size_t file_pos;
/// Кусок данных из кэша, или кусок считанных данных, который мы положим в кэш.
UncompressedCache::MappedPtr owned_cell;
void initInput()
{
if (!file_in)
{
file_in.reset(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size));
compressed_in = &*file_in;
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
}
}
bool nextImpl()
{
/// Проверим наличие разжатого блока в кэше, захватим владение этим блоком, если он есть.
UInt128 key = cache->hash(path, file_pos);
owned_cell = cache->get(key);
if (!owned_cell)
{
/// Если нет - надо прочитать его из файла.
initInput();
file_in->seek(file_pos);
owned_cell.reset(new UncompressedCacheCell);
size_t size_decompressed;
size_t size_compressed_without_checksum;
owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (owned_cell->compressed_size)
{
owned_cell->data.resize(size_decompressed);
decompress(owned_cell->data.m_data, size_decompressed, size_compressed_without_checksum);
/// Положим данные в кэш.
cache->set(key, owned_cell);
}
2013-11-26 11:55:11 +00:00
}
if (owned_cell->data.m_size == 0)
{
2014-04-08 07:31:51 +00:00
owned_cell = nullptr;
return false;
}
working_buffer = Buffer(owned_cell->data.m_data, owned_cell->data.m_data + owned_cell->data.m_size);
file_pos += owned_cell->compressed_size;
return true;
}
/// Передаётся в file_in.
ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type;
public:
2015-07-08 17:59:44 +00:00
CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_,
2015-09-03 12:07:46 +00:00
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_),
aio_threshold(aio_threshold_), file_pos(0)
2013-11-26 11:55:11 +00:00
{
}
2013-11-26 11:55:11 +00:00
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (owned_cell &&
offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
offset_in_decompressed_block <= working_buffer.size())
2013-11-26 11:55:11 +00:00
{
bytes += offset();
2013-11-26 11:55:11 +00:00
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
2013-11-26 11:55:11 +00:00
}
else
{
file_pos = offset_in_compressed_file;
bytes += offset();
nextImpl();
2013-11-26 11:55:11 +00:00
if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
2013-11-26 11:55:11 +00:00
}
}
void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
{
profile_callback = profile_callback_;
clock_type = clock_type_;
}
};
}