Speedup codec NONE

This commit is contained in:
Alexey Milovidov 2021-03-25 23:08:47 +03:00
parent 0675f9403c
commit a8ce138788
5 changed files with 42 additions and 9 deletions

View File

@ -51,7 +51,7 @@ bool CachedCompressedReadBuffer::nextImpl()
{
owned_cell->additional_bytes = codec->getAdditionalSizeAtTheEndOfBuffer();
owned_cell->data.resize(size_decompressed + owned_cell->additional_bytes);
decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
decompressTo(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
}

View File

@ -21,7 +21,7 @@ bool CompressedReadBuffer::nextImpl()
memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
return true;
}
@ -48,7 +48,7 @@ size_t CompressedReadBuffer::readBig(char * to, size_t n)
/// If the decompressed block fits entirely where it needs to be copied.
if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{
decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
}
@ -63,7 +63,7 @@ size_t CompressedReadBuffer::readBig(char * to, size_t n)
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
bytes_read += read(to + bytes_read, n - bytes_read);
break;

View File

@ -184,7 +184,7 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
}
void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
static void readHeaderAndGetCodec(const char * compressed_buffer, size_t size_decompressed, CompressionCodecPtr & codec, bool allow_different_codecs)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
@ -210,11 +210,38 @@ void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, s
ErrorCodes::CANNOT_DECOMPRESS);
}
}
}
void CompressedReadBufferBase::decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
codec->decompress(compressed_buffer, size_compressed_without_checksum, to);
}
void CompressedReadBufferBase::decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
readHeaderAndGetCodec(compressed_buffer, size_decompressed, codec, allow_different_codecs);
if (codec->isNone())
{
/// Shortcut for NONE codec to avoid extra memcpy.
/// We doing it by changing the buffer `to` to point to existing uncompressed data.
UInt8 header_size = ICompressionCodec::getHeaderSize();
if (size_compressed_without_checksum < header_size)
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Can't decompress data: the compressed data size ({}), this should include header size) is less than the header size ({})",
size_compressed_without_checksum, size_t(header_size));
to = BufferBase::Buffer(compressed_buffer + header_size, compressed_buffer + size_compressed_without_checksum);
}
codec->decompress(compressed_buffer, size_compressed_without_checksum, to.begin());
}
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
: compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_)

View File

@ -3,6 +3,7 @@
#include <Common/PODArray.h>
#include <Compression/LZ4_decompress_faster.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferBase.h>
namespace DB
@ -37,7 +38,12 @@ protected:
/// Returns number of compressed bytes read.
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy);
void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum);
/// Decompress into memory pointed by `to`
void decompressTo(char * to, size_t size_decompressed, size_t size_compressed_without_checksum);
/// This method can change location of `to` to avoid unnecessary copy if data is uncompressed.
/// It is more efficient for compression codec NONE but not suitable if you want to decompress into specific location.
void decompress(BufferBase::Buffer & to, size_t size_decompressed, size_t size_compressed_without_checksum);
public:
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.

View File

@ -31,7 +31,7 @@ bool CompressedReadBufferFromFile::nextImpl()
memory.resize(size_decompressed + additional_size_at_the_end_of_buffer);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
return true;
}
@ -108,7 +108,7 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
/// If the decompressed block fits entirely where it needs to be copied.
if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{
decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
}
@ -124,7 +124,7 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
bytes_read += read(to + bytes_read, n - bytes_read);
break;