better reading from compact parts with differents codecs

This commit is contained in:
Anton Popov 2020-09-04 15:48:55 +03:00
parent 6883ee7eea
commit 68913eab62
10 changed files with 104 additions and 103 deletions

View File

@ -12,7 +12,6 @@ namespace DB
namespace ErrorCodes
{
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
@ -20,9 +19,8 @@ void CachedCompressedReadBuffer::initInput()
{
if (!file_in)
{
file_in_holder = file_in_creator();
file_in = file_in_holder.get();
compressed_in = file_in;
file_in = file_in_creator();
compressed_in = file_in.get();
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
@ -74,19 +72,10 @@ bool CachedCompressedReadBuffer::nextImpl()
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, ReadBufferFromFileBase * file_in_, UncompressedCache * cache_)
: ReadBuffer(nullptr, 0), file_in(file_in_), cache(cache_), path(path_), file_pos(0)
{
if (file_in == nullptr)
throw Exception("Neither file_in nor file_in_creator is initialized in CachedCompressedReadBuffer", ErrorCodes::LOGICAL_ERROR);
compressed_in = file_in;
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_)
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_, bool allow_different_codecs_)
: ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0)
{
allow_different_codecs = allow_different_codecs_;
}
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)

View File

@ -21,9 +21,8 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB
{
private:
std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator;
std::unique_ptr<ReadBufferFromFileBase> file_in_holder;
ReadBufferFromFileBase * file_in = nullptr;
UncompressedCache * cache;
std::unique_ptr<ReadBufferFromFileBase> file_in;
const std::string path;
size_t file_pos;
@ -39,8 +38,7 @@ private:
clockid_t clock_type {};
public:
CachedCompressedReadBuffer(const std::string & path_, ReadBufferFromFileBase * file_in_, UncompressedCache * cache_);
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_);
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -105,13 +105,24 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
uint8_t method = ICompressionCodec::readMethod(own_compressed_buffer.data());
if (!codec)
{
codec = CompressionCodecFactory::instance().get(method);
}
else if (method != codec->getMethodByte())
throw Exception("Data compressed with different methods, given method byte 0x"
+ getHexUIntLowercase(method)
+ ", previous method byte 0x"
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
{
if (allow_different_codecs)
{
codec = CompressionCodecFactory::instance().get(method);
}
else
{
throw Exception("Data compressed with different methods, given method byte 0x"
+ getHexUIntLowercase(method)
+ ", previous method byte 0x"
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
}
}
size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(own_compressed_buffer.data());
size_decompressed = ICompressionCodec::readDecompressedBlockSize(own_compressed_buffer.data());
@ -163,21 +174,32 @@ void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, s
uint8_t method = ICompressionCodec::readMethod(compressed_buffer);
if (!codec)
{
codec = CompressionCodecFactory::instance().get(method);
}
else if (codec->getMethodByte() != method)
throw Exception("Data compressed with different methods, given method byte "
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
{
if (allow_different_codecs)
{
codec = CompressionCodecFactory::instance().get(method);
}
else
{
throw Exception("Data compressed with different methods, given method byte "
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
}
}
codec->decompress(compressed_buffer, size_compressed_without_checksum, to);
}
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in)
: compressed_in(in), own_compressed_buffer(0)
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
: compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_)
{
}

View File

@ -26,6 +26,9 @@ protected:
/// Don't checksum on decompressing.
bool disable_checksum = false;
/// Allow reading data, compressed by different codecs from one file.
bool allow_different_codecs;
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum);
@ -34,7 +37,7 @@ protected:
public:
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase(ReadBuffer * in = nullptr);
CompressedReadBufferBase(ReadBuffer * in = nullptr, bool allow_different_codecs_ = false);
~CompressedReadBufferBase();
/** Disable checksums.

View File

@ -36,26 +36,22 @@ bool CompressedReadBufferFromFile::nextImpl()
return true;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(ReadBufferFromFileBase & file_in_)
: BufferWithOwnMemory<ReadBuffer>(0), file_in(file_in_)
{
compressed_in = &file_in;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf)
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{
compressed_in = &file_in;
allow_different_codecs = allow_different_codecs_;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size)
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0)
, p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size))
, file_in(*p_file_in)
{
compressed_in = &file_in;
allow_different_codecs = allow_different_codecs_;
}

View File

@ -28,11 +28,11 @@ private:
bool nextImpl() override;
public:
CompressedReadBufferFromFile(ReadBufferFromFileBase & buf);
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf);
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);
CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -73,31 +73,41 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
buffer_size = settings.max_read_buffer_size;
const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
file_in = data_part->volume->getDisk()->readFile(
full_data_path, buffer_size, 0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
auto full_path = fullPath(data_part->volume->getDisk(), full_data_path);
for (const auto & column : columns)
if (uncompressed_cache)
{
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
if (uncompressed_cache)
{
cached_buffer = std::make_unique<CachedCompressedReadBuffer>(full_path, file_in.get(), uncompressed_cache);
if (profile_callback_)
cached_buffer->setProfileCallback(profile_callback_, clock_type_);
}
else
{
non_cached_buffer = std::make_unique<CompressedReadBufferFromFile>(*file_in);
if (profile_callback_)
non_cached_buffer->setProfileCallback(profile_callback_, clock_type_);
}
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
fullPath(data_part->volume->getDisk(), full_data_path),
[this, full_data_path, buffer_size]()
{
return data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
},
uncompressed_cache,
/* allow_different_codecs = */ true);
auto column_from_part = getColumnFromPart(column);
column_streams[column_from_part.name] = ColumnStream{std::move(cached_buffer), std::move(non_cached_buffer)};
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
cached_buffer = std::move(buffer);
data_buffer = cached_buffer.get();
}
else
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->volume->getDisk()->readFile(
full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io),
/* allow_different_codecs = */ true);
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
non_cached_buffer = std::move(buffer);
data_buffer = non_cached_buffer.get();
}
}
catch (...)
@ -192,16 +202,15 @@ void MergeTreeReaderCompact::readData(
const String & name, IColumn & column, const IDataType & type,
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
{
auto & stream = column_streams[name];
if (!isContinuousReading(from_mark, column_position))
seekToMark(stream, from_mark, column_position);
seekToMark(from_mark, column_position);
auto buffer_getter = [&](const IDataType::SubstreamPath & substream_path) -> ReadBuffer *
{
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != IDataType::Substream::ArraySizes))
return nullptr;
return stream.data_buffer;
return data_buffer;
};
IDataType::DeserializeBinaryBulkSettings deserialize_settings;
@ -221,15 +230,15 @@ void MergeTreeReaderCompact::readData(
}
void MergeTreeReaderCompact::seekToMark(ColumnStream & stream, size_t row_index, size_t column_index)
void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
{
MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index);
try
{
if (stream.cached_buffer)
stream.cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
if (stream.non_cached_buffer)
stream.non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
if (cached_buffer)
cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
if (non_cached_buffer)
non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
}
catch (Exception & e)
{
@ -241,7 +250,6 @@ void MergeTreeReaderCompact::seekToMark(ColumnStream & stream, size_t row_index,
}
}
bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position)
{
if (!last_read_granule)
@ -251,18 +259,6 @@ bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_posi
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part->getColumns().size() - 1);
}
MergeTreeReaderCompact::ColumnStream::ColumnStream(
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer_,
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer_)
: cached_buffer(std::move(cached_buffer_))
, non_cached_buffer(std::move(non_cached_buffer_))
{
if (cached_buffer)
data_buffer = cached_buffer.get();
else
data_buffer = non_cached_buffer.get();
}
namespace
{

View File

@ -39,21 +39,9 @@ public:
private:
bool isContinuousReading(size_t mark, size_t column_position);
std::unique_ptr<ReadBufferFromFileBase> file_in;
struct ColumnStream
{
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
ReadBuffer * data_buffer;
ColumnStream() = default;
ColumnStream(
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer_,
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer_);
};
std::unordered_map<String, ColumnStream> column_streams;
ReadBuffer * data_buffer;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
MergeTreeMarksLoader marks_loader;
@ -66,7 +54,7 @@ private:
size_t next_mark = 0;
std::optional<std::pair<size_t, size_t>> last_read_granule;
void seekToMark(ColumnStream & stream, size_t row_index, size_t column_index);
void seekToMark(size_t row_index, size_t column_index);
void readData(const String & name, IColumn & column, const IDataType & type,
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false);

View File

@ -1,3 +1,6 @@
12000 11890
499500 499500 999
11965 11890
499500 499500 999
5858 11890
499500 499500 999

View File

@ -8,6 +8,8 @@ SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
SELECT sum(id), sum(val), max(s) FROM codecs;
DROP TABLE codecs;
CREATE TABLE codecs (id UInt32 CODEC(NONE), val UInt32 CODEC(NONE), s String CODEC(NONE))
@ -18,6 +20,8 @@ SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
SELECT sum(id), sum(val), max(s) FROM codecs;
DROP TABLE codecs;
CREATE TABLE codecs (id UInt32, val UInt32 CODEC(Delta, ZSTD), s String CODEC(ZSTD))
@ -28,4 +32,6 @@ SELECT sum(data_compressed_bytes), sum(data_uncompressed_bytes)
FROM system.parts
WHERE table = 'codecs' AND database = currentDatabase();
SELECT sum(id), sum(val), max(s) FROM codecs;
DROP TABLE codecs;