fix reading from compact parts with different codecs

This commit is contained in:
Anton Popov 2020-07-10 18:57:10 +03:00
parent 80a62977f1
commit 24f627e52c
6 changed files with 81 additions and 45 deletions

View File

@ -12,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
@ -19,8 +20,9 @@ void CachedCompressedReadBuffer::initInput()
{
if (!file_in)
{
file_in = file_in_creator();
compressed_in = file_in.get();
file_in_holder = file_in_creator();
file_in = file_in_holder.get();
compressed_in = file_in;
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
@ -71,6 +73,14 @@ bool CachedCompressedReadBuffer::nextImpl()
return true;
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, ReadBufferFromFileBase * file_in_, UncompressedCache * cache_)
: ReadBuffer(nullptr, 0), file_in(file_in_), cache(cache_), path(path_), file_pos(0)
{
if (file_in == nullptr)
throw Exception("Neither file_in nor file_in_creator is initialized in CachedCompressedReadBuffer", ErrorCodes::LOGICAL_ERROR);
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_)
: ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0)

View File

@ -22,7 +22,8 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB
private:
std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator;
UncompressedCache * cache;
std::unique_ptr<ReadBufferFromFileBase> file_in;
std::unique_ptr<ReadBufferFromFileBase> file_in_holder;
ReadBufferFromFileBase * file_in;
const std::string path;
size_t file_pos;
@ -38,6 +39,7 @@ private:
clockid_t clock_type {};
public:
CachedCompressedReadBuffer(const std::string & path_, ReadBufferFromFileBase * file_in_, UncompressedCache * cache_);
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -37,6 +37,12 @@ bool CompressedReadBufferFromFile::nextImpl()
return true;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(ReadBufferFromFileBase & file_in_)
: BufferWithOwnMemory<ReadBuffer>(0), file_in(file_in_)
{
compressed_in = &file_in;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf)
: BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{

View File

@ -28,6 +28,7 @@ private:
bool nextImpl() override;
public:
CompressedReadBufferFromFile(ReadBufferFromFileBase & buf);
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf);
CompressedReadBufferFromFile(

View File

@ -45,40 +45,31 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{
size_t buffer_size = settings.max_read_buffer_size;
const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
file_in = data_part->volume->getDisk()->readFile(
full_data_path, buffer_size, 0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
if (uncompressed_cache)
auto full_path = fullPath(data_part->volume->getDisk(), full_data_path);
for (const auto & column : columns)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
fullPath(data_part->volume->getDisk(), full_data_path),
[this, full_data_path, buffer_size]()
{
return data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
},
uncompressed_cache);
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
if (uncompressed_cache)
{
cached_buffer = std::make_unique<CachedCompressedReadBuffer>(full_path, file_in.get(), uncompressed_cache);
if (profile_callback_)
cached_buffer->setProfileCallback(profile_callback_, clock_type_);
}
else
{
non_cached_buffer = std::make_unique<CompressedReadBufferFromFile>(*file_in);
if (profile_callback_)
non_cached_buffer->setProfileCallback(profile_callback_, clock_type_);
}
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
cached_buffer = std::move(buffer);
data_buffer = cached_buffer.get();
}
else
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->volume->getDisk()->readFile(
full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io));
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
non_cached_buffer = std::move(buffer);
data_buffer = non_cached_buffer.get();
column_streams[column.name] = ColumnStream{std::move(cached_buffer), std::move(non_cached_buffer)};
}
size_t columns_num = columns.size();
@ -181,15 +172,16 @@ void MergeTreeReaderCompact::readData(
const String & name, IColumn & column, const IDataType & type,
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
{
auto & stream = column_streams[name];
if (!isContinuousReading(from_mark, column_position))
seekToMark(from_mark, column_position);
seekToMark(stream, from_mark, column_position);
auto buffer_getter = [&](const IDataType::SubstreamPath & substream_path) -> ReadBuffer *
{
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != IDataType::Substream::ArraySizes))
return nullptr;
return data_buffer;
return stream.data_buffer;
};
IDataType::DeserializeBinaryBulkSettings deserialize_settings;
@ -209,15 +201,15 @@ void MergeTreeReaderCompact::readData(
}
void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
void MergeTreeReaderCompact::seekToMark(ColumnStream & stream, size_t row_index, size_t column_index)
{
MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index);
try
{
if (cached_buffer)
cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
if (non_cached_buffer)
non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
if (stream.cached_buffer)
stream.cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
if (stream.non_cached_buffer)
stream.non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
}
catch (Exception & e)
{
@ -239,4 +231,16 @@ bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_posi
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part->getColumns().size() - 1);
}
MergeTreeReaderCompact::ColumnStream::ColumnStream(
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer_,
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer_)
: cached_buffer(std::move(cached_buffer_))
, non_cached_buffer(std::move(non_cached_buffer_))
{
if (cached_buffer)
data_buffer = cached_buffer.get();
else
data_buffer = non_cached_buffer.get();
}
}

View File

@ -2,6 +2,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <IO/ReadBufferFromFileBase.h>
namespace DB
@ -35,9 +36,21 @@ public:
private:
bool isContinuousReading(size_t mark, size_t column_position);
ReadBuffer * data_buffer;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
std::unique_ptr<ReadBufferFromFileBase> file_in;
struct ColumnStream
{
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
ReadBuffer * data_buffer;
ColumnStream() = default;
ColumnStream(
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer_,
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer_);
};
std::unordered_map<String, ColumnStream> column_streams;
MergeTreeMarksLoader marks_loader;
@ -49,7 +62,7 @@ private:
size_t next_mark = 0;
std::optional<std::pair<size_t, size_t>> last_read_granule;
void seekToMark(size_t row_index, size_t column_index);
void seekToMark(ColumnStream & stream, size_t row_index, size_t column_index);
void readData(const String & name, IColumn & column, const IDataType & type,
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false);