Refactoring (sanitizer failure)

This commit is contained in:
alesapin 2018-12-19 20:20:18 +03:00
parent 9b0d47c0c3
commit 512fe3c854
17 changed files with 277 additions and 196 deletions

View File

@ -10,35 +10,34 @@
namespace DB
{
char CompressionCodecLZ4::getMethodByte()
UInt8 CompressionCodecLZ4::getMethodByte() const
{
return static_cast<char>(CompressionMethodByte::LZ4);
return static_cast<UInt8>(CompressionMethodByte::LZ4);
}
void CompressionCodecLZ4::getCodecDesc(String & codec_desc)
String CompressionCodecLZ4::getCodecDesc() const
{
codec_desc = "LZ4";
return "LZ4";
}
size_t CompressionCodecLZ4::getCompressedReserveSize(size_t uncompressed_size)
UInt32 CompressionCodecLZ4::getCompressedDataSize(UInt32 uncompressed_size) const
{
return LZ4_COMPRESSBOUND(uncompressed_size);
}
size_t CompressionCodecLZ4::compress(char * source, size_t source_size, char * dest)
UInt32 CompressionCodecLZ4::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
return LZ4_compress_default(source, dest, source_size, LZ4_COMPRESSBOUND(source_size));
}
size_t CompressionCodecLZ4::decompress(char * source, size_t source_size, char * dest, size_t size_decompressed)
void CompressionCodecLZ4::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
LZ4::decompress(source, dest, source_size, size_decompressed, lz4_stat);
return size_decompressed;
LZ4::decompress(source, dest, source_size, uncompressed_size, lz4_stat);
}
void registerCodecLZ4(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec("LZ4", static_cast<char>(CompressionMethodByte::LZ4), [&](){
factory.registerSimpleCompressionCodec("LZ4", static_cast<UInt8>(CompressionMethodByte::LZ4), [&](){
return std::make_shared<CompressionCodecLZ4>();
});
}

View File

@ -12,18 +12,19 @@ namespace DB
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
char getMethodByte() override;
UInt8 getMethodByte() const override;
void getCodecDesc(String & codec_desc) override;
size_t compress(char * source, size_t source_size, char * dest) override;
size_t getCompressedReserveSize(size_t uncompressed_size) override;
size_t decompress(char * source, size_t source_size, char * dest, size_t decompressed_size) override;
String getCodecDesc() const override;
private:
LZ4::PerformanceStatistics lz4_stat;
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override;
mutable LZ4::PerformanceStatistics lz4_stat;
};
}
}

View File

@ -4,6 +4,7 @@
#include <Compression/CompressionFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/hex.h>
namespace DB
@ -25,100 +26,101 @@ CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs)
codec_desc = codec_desc + ',';
const auto codec = codecs[idx];
String inner_codec_desc;
codec->getCodecDesc(inner_codec_desc);
codec_desc = codec_desc + inner_codec_desc;
codec_desc = codec_desc + codec->getCodecDesc();
}
}
char CompressionCodecMultiple::getMethodByte()
UInt8 CompressionCodecMultiple::getMethodByte() const
{
return static_cast<char>(CompressionMethodByte::Multiple);
return static_cast<UInt8>(CompressionMethodByte::Multiple);
}
void CompressionCodecMultiple::getCodecDesc(String & codec_desc_)
String CompressionCodecMultiple::getCodecDesc() const
{
codec_desc_ = codec_desc;
return codec_desc;
}
size_t CompressionCodecMultiple::getCompressedReserveSize(size_t uncompressed_size)
UInt32 CompressionCodecMultiple::getCompressedDataSize(UInt32 uncompressed_size) const
{
UInt32 compressed_size = uncompressed_size;
for (auto & codec : codecs)
uncompressed_size += codec->getCompressedReserveSize(uncompressed_size);
compressed_size = codec->getCompressedReserveSize(compressed_size);
/// MultipleCodecByte TotalCodecs ByteForEachCodec data
return sizeof(UInt8) + sizeof(UInt8) + codecs.size() + uncompressed_size;
/// TotalCodecs ByteForEachCodec data
return sizeof(UInt8) + codecs.size() + compressed_size;
}
size_t CompressionCodecMultiple::compress(char * source, size_t source_size, char * dest)
UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
static constexpr size_t header_for_size_store = sizeof(UInt32) + sizeof(UInt32);
PODArray<char> compressed_buf;
PODArray<char> uncompressed_buf(source, source + source_size);
dest[0] = static_cast<char>(getMethodByte());
dest[1] = static_cast<char>(codecs.size());
dest[0] = static_cast<UInt8>(codecs.size());
size_t codecs_byte_pos = 2;
size_t codecs_byte_pos = 1;
for (size_t idx = 0; idx < codecs.size(); ++idx, ++codecs_byte_pos)
{
const auto codec = codecs[idx];
dest[codecs_byte_pos] = codec->getMethodByte();
compressed_buf.resize(header_for_size_store + codec->getCompressedReserveSize(source_size));
compressed_buf.resize(codec->getCompressedReserveSize(source_size));
size_t size_compressed = header_for_size_store;
size_compressed += codec->compress(&uncompressed_buf[0], source_size, &compressed_buf[header_for_size_store]);
UInt32 compressed_size_32 = size_compressed;
UInt32 uncompressed_size_32 = source_size;
unalignedStore(&compressed_buf[0], compressed_size_32);
unalignedStore(&compressed_buf[4], uncompressed_size_32);
UInt32 size_compressed = codec->compress(uncompressed_buf.data(), source_size, compressed_buf.data());
uncompressed_buf.swap(compressed_buf);
source_size = size_compressed;
}
memcpy(&dest[2 + codecs.size()], &uncompressed_buf[0], source_size);
//std::cerr << "(compress) BUF_SIZE_COMPRESSED:" << source_size << std::endl;
return 2 + codecs.size() + source_size;
memcpy(&dest[1 + codecs.size()], uncompressed_buf.data(), source_size);
//std::cerr << "(compress) COMPRESSING BUF:\n";
//for (size_t i = 0; i < source_size + 1 + codecs.size(); ++i)
// std::cerr << getHexUIntLowercase(+dest[i]) << " ";
//std::cerr << std::endl;
return 1 + codecs.size() + source_size;
}
size_t CompressionCodecMultiple::decompress(char * source, size_t source_size, char * dest, size_t decompressed_size)
void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const
{
UInt8 compression_methods_size = source[0];
static constexpr size_t header_for_size_store = sizeof(UInt32) + sizeof(UInt32);
//std::cerr << "(decompress) DECOMPRESSING BUF:\n";
//for (size_t i = 0; i < source_size; ++i)
// std::cerr << getHexUIntLowercase(+source[i]) << " ";
//std::cerr << std::endl;
if (source[0] != getMethodByte())
throw Exception("Incorrect compression method for codec multiple, given " + toString(source[0]) + ", expected " + toString(getMethodByte()),
ErrorCodes::UNKNOWN_CODEC);
UInt8 compression_methods_size = source[1];
PODArray<char> compressed_buf;
//std::cerr << "(decompress) BUF_SIZE_COMPRESSED:" << source_size << std::endl;
//std::cerr << "(decompress) CODECS SIZE:" << +compression_methods_size << std::endl;
PODArray<char> compressed_buf(&source[compression_methods_size + 1], &source[source_size]);
PODArray<char> uncompressed_buf;
/// Insert all data into compressed buf
compressed_buf.insert(&source[compression_methods_size + 2], &source[source_size]);
source_size -= (compression_methods_size + 1);
for (long idx = compression_methods_size - 1; idx >= 0; --idx)
{
UInt8 compression_method = source[idx + 2];
UInt8 compression_method = source[idx + 1];
const auto codec = CompressionCodecFactory::instance().get(compression_method);
UInt32 compressed_size = unalignedLoad<UInt32>(&compressed_buf[0]);
UInt32 uncompressed_size = unalignedLoad<UInt32>(&compressed_buf[4]);
UInt32 uncompressed_size = ICompressionCodec::readDecompressedBlockSize(compressed_buf.data());
//std::cerr << "(decompress) UNCOMPRESSED SIZE READ:" << uncompressed_size << std::endl;
if (idx == 0 && uncompressed_size != decompressed_size)
throw Exception("Wrong final decompressed size in codec Multiple, got " + toString(uncompressed_size) + ", expected " + toString(decompressed_size), ErrorCodes::CORRUPTED_DATA);
uncompressed_buf.resize(uncompressed_size);
codec->decompress(&compressed_buf[header_for_size_store], compressed_size - header_for_size_store, &uncompressed_buf[0], uncompressed_size);
codec->decompress(compressed_buf.data(), source_size, uncompressed_buf.data());
uncompressed_buf.swap(compressed_buf);
source_size = uncompressed_size;
}
memcpy(dest, compressed_buf.data(), decompressed_size);
return decompressed_size;
}
void registerCodecMultiple(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec("Multiple", static_cast<char>(CompressionMethodByte::Multiple), [&](){
factory.registerSimpleCompressionCodec("Multiple", static_cast<UInt8>(CompressionMethodByte::Multiple), [&](){
return std::make_shared<CompressionCodecMultiple>();
});
}

View File

@ -11,15 +11,16 @@ public:
CompressionCodecMultiple() = default;
explicit CompressionCodecMultiple(Codecs codecs);
char getMethodByte() override;
UInt8 getMethodByte() const override;
void getCodecDesc(String & codec_desc) override;
String getCodecDesc() const override;
size_t compress(char * source, size_t source_size, char * dest) override;
UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override;
size_t getCompressedReserveSize(size_t uncompressed_size) override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
private:
Codecs codecs;

View File

@ -6,26 +6,25 @@
namespace DB
{
char CompressionCodecNone::getMethodByte()
UInt8 CompressionCodecNone::getMethodByte() const
{
return static_cast<char>(CompressionMethodByte::NONE);
return static_cast<UInt8>(CompressionMethodByte::NONE);
}
void CompressionCodecNone::getCodecDesc(String & codec_desc)
String CompressionCodecNone::getCodecDesc() const
{
codec_desc = "NONE";
return "NONE";
}
size_t CompressionCodecNone::compress(char * source, size_t source_size, char * dest)
UInt32 CompressionCodecNone::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
memcpy(dest, source, source_size);
return source_size;
}
size_t CompressionCodecNone::decompress(char * source, size_t /*source_size*/, char * dest, size_t size_decompressed)
void CompressionCodecNone::doDecompressData(const char * source, UInt32 /*source_size*/, char * dest, UInt32 uncompressed_size) const
{
memcpy(dest, source, size_decompressed);
return size_decompressed;
memcpy(dest, source, uncompressed_size);
}
void registerCodecNone(CompressionCodecFactory & factory)

View File

@ -11,13 +11,15 @@ namespace DB
class CompressionCodecNone : public ICompressionCodec
{
public:
char getMethodByte() override;
UInt8 getMethodByte() const override;
void getCodecDesc(String & codec_desc) override;
String getCodecDesc() const override;
size_t compress(char * source, size_t source_size, char * compressed_buf) override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override;
};
}
}

View File

@ -21,22 +21,23 @@ namespace ErrorCodes
extern const int ILLEGAL_CODEC_PARAMETER;
}
char CompressionCodecZSTD::getMethodByte()
UInt8 CompressionCodecZSTD::getMethodByte() const
{
return static_cast<char>(CompressionMethodByte::ZSTD);
return static_cast<UInt8>(CompressionMethodByte::ZSTD);
}
void CompressionCodecZSTD::getCodecDesc(String & codec_desc)
String CompressionCodecZSTD::getCodecDesc() const
{
codec_desc = "ZSTD";
return "ZSTD";
}
size_t CompressionCodecZSTD::getCompressedReserveSize(size_t uncompressed_size)
UInt32 CompressionCodecZSTD::getCompressedDataSize(UInt32 uncompressed_size) const
{
return ZSTD_compressBound(uncompressed_size);
}
size_t CompressionCodecZSTD::compress(char * source, size_t source_size, char * dest)
UInt32 CompressionCodecZSTD::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
size_t compressed_size = ZSTD_compress(dest, ZSTD_compressBound(source_size), source, source_size, level);
@ -46,18 +47,17 @@ size_t CompressionCodecZSTD::compress(char * source, size_t source_size, char *
return compressed_size;
}
size_t CompressionCodecZSTD::decompress(char * source, size_t source_size, char * dest, size_t size_decompressed)
void CompressionCodecZSTD::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
size_t res = ZSTD_decompress(dest, size_decompressed, source, source_size);
size_t res = ZSTD_decompress(dest, uncompressed_size, source, source_size);
if (ZSTD_isError(res))
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
return size_decompressed;
}
CompressionCodecZSTD::CompressionCodecZSTD(int level)
:level(level)
CompressionCodecZSTD::CompressionCodecZSTD(int level_)
:level(level_)
{
}

View File

@ -11,20 +11,21 @@ namespace DB
class CompressionCodecZSTD : public ICompressionCodec
{
public:
CompressionCodecZSTD(int level);
CompressionCodecZSTD(int level_);
char getMethodByte() override;
UInt8 getMethodByte() const override;
void getCodecDesc(String & codec_desc) override;
String getCodecDesc() const override;
size_t compress(char * source, size_t source_size, char * dest) override;
UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override;
size_t getCompressedReserveSize(size_t uncompressed_size) override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
private:
int level;
};
}
}

View File

@ -28,97 +28,147 @@ namespace ErrorCodes
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
CompressionCodecReadBufferPtr ICompressionCodec::liftCompressed(ReadBuffer & origin)
static constexpr auto CHECKSUM_SIZE{sizeof(CityHash_v1_0_2::uint128)};
UInt32 ICompressionCodec::compress(char * source, UInt32 source_size, char * dest) const
{
return std::make_shared<CompressionCodecReadBuffer>(origin);
dest[0] = getMethodByte();
UInt8 header_size = getHeaderSize();
/// Write data from header_size
UInt32 compressed_bytes_written = doCompressData(source, source_size, &dest[header_size]);
unalignedStore<UInt32>(&dest[1], compressed_bytes_written + header_size);
unalignedStore<UInt32>(&dest[5], source_size);
return header_size + compressed_bytes_written;
}
CompressionCodecWriteBufferPtr ICompressionCodec::liftCompressed(WriteBuffer & origin)
UInt32 ICompressionCodec::decompress(char * source, UInt32 source_size, char * dest) const
{
return std::make_shared<CompressionCodecWriteBuffer>(*this, origin);
UInt8 method = source[0];
if (method != getMethodByte())
throw Exception("Can't decompress data with codec byte " + toString(method) + " from codec with byte " + toString(method), ErrorCodes::CANNOT_DECOMPRESS);
UInt8 header_size = getHeaderSize();
UInt32 decompressed_size = unalignedLoad<UInt32>(&source[5]);
doDecompressData(&source[header_size], source_size - header_size, dest, decompressed_size);
return decompressed_size;
}
CompressionCodecReadBuffer::CompressionCodecReadBuffer(ReadBuffer & origin)
: origin(origin)
UInt32 ICompressionCodec::readCompressedBlockSize(const char * source)
{
return unalignedLoad<UInt32>(&source[1]);
}
UInt32 ICompressionCodec::readDecompressedBlockSize(const char * source)
{
return unalignedLoad<UInt32>(&source[5]);
}
UInt8 ICompressionCodec::readMethod(const char * source)
{
return static_cast<UInt8>(source[0]);
}
CompressionCodecReadBufferPtr liftCompressed(CompressionCodecPtr codec, ReadBuffer & origin)
{
return std::make_shared<CompressionCodecReadBuffer>(codec, origin);
}
CompressionCodecWriteBufferPtr liftCompressed(CompressionCodecPtr codec, WriteBuffer & origin)
{
return std::make_shared<CompressionCodecWriteBuffer>(codec, origin);
}
CompressionCodecReadBuffer::CompressionCodecReadBuffer(CompressionCodecPtr codec_, ReadBuffer & origin_)
: codec(codec_)
, origin(origin_)
{
}
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t CompressionCodecReadBuffer::readCompressedData(size_t & size_decompressed, size_t & size_compressed)
std::pair<UInt32, UInt32> CompressionCodecReadBuffer::readCompressedData()
{
if (origin.eof())
return 0;
return std::make_pair(0, 0);
CityHash_v1_0_2::uint128 checksum;
origin.readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
origin.readStrict(reinterpret_cast<char *>(&checksum), CHECKSUM_SIZE);
own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE);
origin.readStrict(own_compressed_buffer.data(), COMPRESSED_BLOCK_HEADER_SIZE);
UInt8 header_size = ICompressionCodec::getHeaderSize();
own_compressed_buffer.resize(header_size);
origin.readStrict(own_compressed_buffer.data(), header_size);
size_compressed = unalignedLoad<UInt32>(&own_compressed_buffer[1]);
size_decompressed = unalignedLoad<UInt32>(&own_compressed_buffer[5]);
UInt8 method = ICompressionCodec::readMethod(own_compressed_buffer.data());
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed: " + toString(size_compressed) + ". Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
if (method != codec->getMethodByte())
throw Exception("Can't decompress with method " + getHexUIntLowercase(method) + ", with codec " + getHexUIntLowercase(codec->getMethodByte()), ErrorCodes::CANNOT_DECOMPRESS);
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
UInt32 size_to_read_compressed = ICompressionCodec::readCompressedBlockSize(own_compressed_buffer.data());
UInt32 size_decompressed = ICompressionCodec::readDecompressedBlockSize(own_compressed_buffer.data());
if (size_to_read_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_to_read_compressed: " + toString(size_to_read_compressed) + ". Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_to_read_compressed + CHECKSUM_SIZE);
/// Is whole compressed block located in 'origin' buffer?
if (origin.offset() >= COMPRESSED_BLOCK_HEADER_SIZE &&
origin.position() + size_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER - COMPRESSED_BLOCK_HEADER_SIZE <= origin.buffer().end())
if (origin.offset() >= header_size &&
origin.position() + size_to_read_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER - header_size <= origin.buffer().end())
{
origin.position() -= COMPRESSED_BLOCK_HEADER_SIZE;
origin.position() -= header_size;
compressed_buffer = origin.position();
origin.position() += size_compressed;
origin.position() += size_to_read_compressed;
}
else
{
own_compressed_buffer.resize(size_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
own_compressed_buffer.resize(size_to_read_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
compressed_buffer = own_compressed_buffer.data();
origin.readStrict(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
origin.readStrict(compressed_buffer + header_size, size_to_read_compressed - header_size);
}
auto checksum_calculated = CityHash_v1_0_2::CityHash128(compressed_buffer, size_compressed);
auto checksum_calculated = CityHash_v1_0_2::CityHash128(compressed_buffer, size_to_read_compressed);
if (checksum != checksum_calculated)
throw Exception("Checksum doesn't match: corrupted data."
" Reference: " + getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second)
+ ". Actual: " + getHexUIntLowercase(checksum_calculated.first) + getHexUIntLowercase(checksum_calculated.second)
+ ". Size of compressed block: " + toString(size_compressed) + ".",
+ ". Size of compressed block: " + toString(size_to_read_compressed),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
return size_compressed + sizeof(checksum);
return std::make_pair(size_to_read_compressed, size_decompressed);
}
void CompressionCodecReadBuffer::decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
void CompressionCodecReadBuffer::decompress(char * to, UInt32 size_compressed)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
UInt8 method = ICompressionCodec::readMethod(compressed_buffer);
UInt8 current_method = compressed_buffer[0]; /// See CompressedWriteBuffer.h
if (!codec || current_method != method)
{
method = current_method;
if (!codec)
codec = CompressionCodecFactory::instance().get(method);
}
else if (codec->getMethodByte() != method)
throw Exception("Can't decompress data with byte " + getHexUIntLowercase(method) + " expected byte " + getHexUIntLowercase(codec->getMethodByte()), ErrorCodes::CANNOT_DECOMPRESS);
codec->decompress(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE,
size_compressed_without_checksum - COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed);
codec->decompress(compressed_buffer, size_compressed, to);
}
bool CompressionCodecReadBuffer::nextImpl()
{
size_t size_decompressed;
size_t size_compressed_without_checksum;
UInt32 size_decompressed;
size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (!size_compressed)
std::tie(read_compressed_bytes_for_last_time, size_decompressed) = readCompressedData();
if (!read_compressed_bytes_for_last_time)
return false;
memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
decompress(working_buffer.begin(), read_compressed_bytes_for_last_time);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
return true;
}
@ -127,16 +177,19 @@ void CompressionCodecReadBuffer::seek(size_t offset_in_compressed_file, size_t o
{
if (const auto file_in = dynamic_cast<ReadBufferFromFileBase *>(&origin))
{
if (size_compressed &&
offset_in_compressed_file == file_in->getPositionInFile() - size_compressed &&
offset_in_decompressed_block <= working_buffer.size())
UInt32 readed_size_with_checksum = read_compressed_bytes_for_last_time + CHECKSUM_SIZE;
UInt32 last_readed_block_start_pos = file_in->getPositionInFile() - readed_size_with_checksum;
/// We seek in already uncompressed block
if (readed_size_with_checksum && /// we already have read something
offset_in_compressed_file == last_readed_block_start_pos && /// our position is exactly at required byte
offset_in_decompressed_block <= working_buffer.size()) /// our buffer size is more, than required position in uncompressed block
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
/// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right.
bytes -= offset();
}
else
else /// or we have to read and uncompress further
{
file_in->seek(offset_in_compressed_file);
@ -173,26 +226,18 @@ void CompressionCodecWriteBuffer::nextImpl()
if (!offset())
return;
static constexpr size_t header_size = 1 + sizeof(UInt32) + sizeof(UInt32);
size_t decompressed_size = offset();
UInt32 compressed_reserve_size = codec->getCompressedReserveSize(decompressed_size);
compressed_buffer.resize(compressed_reserve_size);
UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data());
size_t uncompressed_size = offset();
size_t compressed_reserve_size = compression_codec.getCompressedReserveSize(uncompressed_size);
compressed_buffer.resize(header_size + compressed_reserve_size);
compressed_buffer[0] = compression_codec.getMethodByte();
size_t compressed_size = header_size + compression_codec.compress(working_buffer.begin(), uncompressed_size, &compressed_buffer[header_size]);
UInt32 compressed_size_32 = compressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
unalignedStore(&compressed_buffer[1], compressed_size_32);
unalignedStore(&compressed_buffer[5], uncompressed_size_32);
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(compressed_buffer.data(), compressed_size);
out.write(reinterpret_cast<const char *>(&checksum), sizeof(checksum));
out.write(reinterpret_cast<const char *>(&checksum), CHECKSUM_SIZE);
out.write(compressed_buffer.data(), compressed_size);
}
CompressionCodecWriteBuffer::CompressionCodecWriteBuffer(ICompressionCodec & compression_codec, WriteBuffer & out, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out), compression_codec(compression_codec)
CompressionCodecWriteBuffer::CompressionCodecWriteBuffer(CompressionCodecPtr codec_, WriteBuffer & out_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(codec_)
{
}

View File

@ -28,44 +28,45 @@ using CompressionCodecWriteBufferPtr = std::shared_ptr<CompressionCodecWriteBuff
class CompressionCodecWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
CompressionCodecWriteBuffer(ICompressionCodec & compression_codec, WriteBuffer & out, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
CompressionCodecWriteBuffer(CompressionCodecPtr codec_, WriteBuffer & out_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
~CompressionCodecWriteBuffer() override;
private:
void nextImpl() override;
private:
WriteBuffer & out;
ICompressionCodec & compression_codec;
CompressionCodecPtr codec;
PODArray<char> compressed_buffer;
};
class CompressionCodecReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
UInt32 read_compressed_bytes_for_last_time = 0;
public:
size_t size_compressed = 0;
size_t size_decompressed = 0;
std::pair<UInt32, UInt32> readCompressedData();
CompressionCodecReadBuffer(ReadBuffer & origin);
void decompress(char * to, UInt32 size_compressed);
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed);
void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum);
CompressionCodecReadBuffer(CompressionCodecPtr codec_, ReadBuffer & origin_);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
private:
CompressionCodecPtr codec;
ReadBuffer & origin;
char * compressed_buffer;
UInt8 method;
CompressionCodecPtr codec;
PODArray<char> own_compressed_buffer;
bool nextImpl() override;
};
CompressionCodecReadBufferPtr liftCompressed(CompressionCodecPtr codec, ReadBuffer & origin);
CompressionCodecWriteBufferPtr liftCompressed(CompressionCodecPtr codec, WriteBuffer & origin);
/**
*
*/
@ -74,20 +75,31 @@ class ICompressionCodec : private boost::noncopyable
public:
virtual ~ICompressionCodec() = default;
CompressionCodecReadBufferPtr liftCompressed(ReadBuffer & origin);
virtual UInt8 getMethodByte() const = 0;
CompressionCodecWriteBufferPtr liftCompressed(WriteBuffer & origin);
virtual String getCodecDesc() const = 0;
virtual char getMethodByte() = 0;
virtual UInt32 compress(char * source, UInt32 source_size, char * dest) const;
/// TODO(alesap) FIXME
virtual void getCodecDesc(String & codec_desc) = 0;
virtual UInt32 decompress(char * source, UInt32 source_size, char * dest) const;
virtual size_t compress(char * source, size_t source_size, char * dest) = 0;
virtual UInt32 getCompressedReserveSize(UInt32 uncompressed_size) const { return getHeaderSize() + getCompressedDataSize(uncompressed_size); }
virtual size_t decompress(char * source, size_t source_size, char * dest, size_t decompressed_size) = 0;
static UInt8 getHeaderSize() { return 1 + 8; }
virtual size_t getCompressedReserveSize(size_t uncompressed_size) { return uncompressed_size; }
static UInt32 readCompressedBlockSize(const char * source);
static UInt32 readDecompressedBlockSize(const char * source);
static UInt8 readMethod(const char * source);
protected:
virtual UInt32 getCompressedDataSize(UInt32 uncompressed_size) const { return uncompressed_size; }
virtual UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const = 0;
virtual void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const = 0;
};
}

View File

@ -20,7 +20,7 @@ void CachedCompressedReadBuffer::initInput()
if (!file_in)
{
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size);
in = codec->liftCompressed(*file_in);
in = liftCompressed(codec, *file_in);
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
@ -42,17 +42,16 @@ bool CachedCompressedReadBuffer::nextImpl()
owned_cell = std::make_shared<UncompressedCacheCell>();
size_t size_decompressed;
size_t size_compressed_without_checksum;
owned_cell->compressed_size = in->readCompressedData(size_decompressed, size_compressed_without_checksum);
UInt32 size_decompressed;
std::tie(owned_cell->compressed_size, size_decompressed) = in->readCompressedData();
if (owned_cell->compressed_size)
{
owned_cell->data.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
in->decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
in->decompress(owned_cell->data.data(), owned_cell->compressed_size);
in->buffer() = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
in->decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
in->decompress(owned_cell->data.data(), owned_cell->compressed_size);
/// Put data into cache.
cache->set(key, owned_cell);
@ -74,9 +73,9 @@ bool CachedCompressedReadBuffer::nextImpl()
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec,
const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec_,
size_t estimated_size_, size_t aio_threshold_, size_t buf_size_)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), codec(codec), buf_size(buf_size_), estimated_size(estimated_size_),
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), codec(codec_), buf_size(buf_size_), estimated_size(estimated_size_),
aio_threshold(aio_threshold_), file_pos(0)
{
}

View File

@ -45,7 +45,7 @@ private:
public:
CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec,
const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec_,
size_t estimated_size_, size_t aio_threshold_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);

View File

@ -371,8 +371,7 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns)
const auto ct = columns.codecs.find(column.name);
if (ct != std::end(columns.codecs))
{
String codec_desc;
ct->second->getCodecDesc(codec_desc);
String codec_desc = ct->second->getCodecDesc();
codec_desc = "CODEC(" + codec_desc + ")";
auto pos = codec_desc.data();
const auto end = pos + codec_desc.size();

View File

@ -252,7 +252,7 @@ MergeTreeReader::Stream::Stream(
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
const auto compressed_buffer = codec->liftCompressed(*file_in);
const auto compressed_buffer = liftCompressed(codec, *file_in);
non_cached_buffer = compressed_buffer;
data_buffer = non_cached_buffer.get();
}

View File

@ -192,7 +192,7 @@ IMergedBlockOutputStream::ColumnStream::ColumnStream(
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(createWriteBufferFromFileBase(data_path + data_file_extension, estimated_size, aio_threshold, max_compress_block_size)),
plain_hashing(*plain_file), compressed_buf(compression_codec->liftCompressed(plain_hashing)), compressed(*compressed_buf.get()),
plain_hashing(*plain_file), compressed_buf(liftCompressed(compression_codec, plain_hashing)), compressed(*compressed_buf.get()),
marks_file(marks_path + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file)
{
}

View File

@ -11,3 +11,5 @@
9175437371954010821
1.5555555555555 hello world! [77] ['John']
7.1000000000000 xxxxxxxxxxxx [127] ['Henry']
!
222

View File

@ -49,19 +49,19 @@ INSERT INTO test.compression_codec_multiple VALUES (1, 'world', toDate('2018-10-
SELECT * FROM test.compression_codec_multiple ORDER BY id;
INSERT INTO test.compression_codec_multiple select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number from system.numbers limit 10000;
INSERT INTO test.compression_codec_multiple select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT count(*) from test.compression_codec_multiple;
SELECT count(*) FROM test.compression_codec_multiple;
SELECT count(distinct data) from test.compression_codec_multiple;
SELECT count(distinct data) FROM test.compression_codec_multiple;
SELECT floor(sum(somenum), 1) from test.compression_codec_multiple;
SELECT floor(sum(somenum), 1) FROM test.compression_codec_multiple;
TRUNCATE TABLE test.compression_codec_multiple;
INSERT INTO test.compression_codec_multiple select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number from system.numbers limit 10000;
INSERT INTO test.compression_codec_multiple select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT sum(cityHash64(*)) from test.compression_codec_multiple;
SELECT sum(cityHash64(*)) FROM test.compression_codec_multiple;
DROP TABLE IF EXISTS test.compression_codec_multiple_more_types;
@ -75,3 +75,22 @@ INSERT INTO test.compression_codec_multiple_more_types VALUES(1.5555555555555, '
INSERT INTO test.compression_codec_multiple_more_types VALUES(7.1, 'xxxxxxxxxxxx', [127], ['Henry']);
SELECT * FROM test.compression_codec_multiple_more_types order by id;
DROP TABLE IF EXISTS test.compression_codec_multiple_with_key;
CREATE TABLE test.compression_codec_multiple_with_key (
somedate Date CODEC(ZSTD, ZSTD, ZSTD(12)),
id UInt64 CODEC(LZ4, ZSTD, NONE),
data String CODEC(ZSTD(2), NONE, LZ4, LZ4)
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity = 2;
INSERT INTO test.compression_codec_multiple_with_key VALUES(toDate('2018-10-12'), 100000, 'hello'), (toDate('2018-10-12'), 100002, 'world'), (toDate('2018-10-12'), 1111, '!');
SELECT data FROM test.compression_codec_multiple_with_key WHERE id BETWEEN 3 AND 1112;
INSERT INTO test.compression_codec_multiple_with_key SELECT toDate('2018-10-12'), number, toString(number) FROM system.numbers LIMIT 1000;
SELECT COUNT(DISTINCT data) FROM test.compression_codec_multiple_with_key WHERE id < 222;
DROP TABLE IF EXISTS test.compression_codec_multiple_with_key;