From 512fe3c854841028ea44a9f6415fad1263fc36b1 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 19 Dec 2018 20:20:18 +0300 Subject: [PATCH] Refactoring (sanitizer failure) --- dbms/src/Compression/CompressionCodecLZ4.cpp | 19 +- dbms/src/Compression/CompressionCodecLZ4.h | 21 +-- .../Compression/CompressionCodecMultiple.cpp | 84 ++++----- .../Compression/CompressionCodecMultiple.h | 11 +- dbms/src/Compression/CompressionCodecNone.cpp | 15 +- dbms/src/Compression/CompressionCodecNone.h | 12 +- dbms/src/Compression/CompressionCodecZSTD.cpp | 24 +-- dbms/src/Compression/CompressionCodecZSTD.h | 15 +- dbms/src/Compression/ICompressionCodec.cpp | 163 +++++++++++------- dbms/src/Compression/ICompressionCodec.h | 52 +++--- dbms/src/IO/CachedCompressedReadBuffer.cpp | 15 +- dbms/src/IO/CachedCompressedReadBuffer.h | 2 +- .../Interpreters/InterpreterCreateQuery.cpp | 3 +- .../Storages/MergeTree/MergeTreeReader.cpp | 2 +- .../MergeTree/MergedBlockOutputStream.cpp | 2 +- ...4_test_custom_compression_codecs.reference | 2 + .../00804_test_custom_compression_codecs.sql | 31 +++- 17 files changed, 277 insertions(+), 196 deletions(-) diff --git a/dbms/src/Compression/CompressionCodecLZ4.cpp b/dbms/src/Compression/CompressionCodecLZ4.cpp index ea9c7e65cf9..8c44ebfac43 100644 --- a/dbms/src/Compression/CompressionCodecLZ4.cpp +++ b/dbms/src/Compression/CompressionCodecLZ4.cpp @@ -10,35 +10,34 @@ namespace DB { -char CompressionCodecLZ4::getMethodByte() +UInt8 CompressionCodecLZ4::getMethodByte() const { - return static_cast(CompressionMethodByte::LZ4); + return static_cast(CompressionMethodByte::LZ4); } -void CompressionCodecLZ4::getCodecDesc(String & codec_desc) +String CompressionCodecLZ4::getCodecDesc() const { - codec_desc = "LZ4"; + return "LZ4"; } -size_t CompressionCodecLZ4::getCompressedReserveSize(size_t uncompressed_size) +UInt32 CompressionCodecLZ4::getCompressedDataSize(UInt32 uncompressed_size) const { return LZ4_COMPRESSBOUND(uncompressed_size); } -size_t CompressionCodecLZ4::compress(char * source, size_t source_size, char * dest) +UInt32 CompressionCodecLZ4::doCompressData(const char * source, UInt32 source_size, char * dest) const { return LZ4_compress_default(source, dest, source_size, LZ4_COMPRESSBOUND(source_size)); } -size_t CompressionCodecLZ4::decompress(char * source, size_t source_size, char * dest, size_t size_decompressed) +void CompressionCodecLZ4::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const { - LZ4::decompress(source, dest, source_size, size_decompressed, lz4_stat); - return size_decompressed; + LZ4::decompress(source, dest, source_size, uncompressed_size, lz4_stat); } void registerCodecLZ4(CompressionCodecFactory & factory) { - factory.registerSimpleCompressionCodec("LZ4", static_cast(CompressionMethodByte::LZ4), [&](){ + factory.registerSimpleCompressionCodec("LZ4", static_cast(CompressionMethodByte::LZ4), [&](){ return std::make_shared(); }); } diff --git a/dbms/src/Compression/CompressionCodecLZ4.h b/dbms/src/Compression/CompressionCodecLZ4.h index 0ee0286c4ee..1e0a00fa753 100644 --- a/dbms/src/Compression/CompressionCodecLZ4.h +++ b/dbms/src/Compression/CompressionCodecLZ4.h @@ -12,18 +12,19 @@ namespace DB class CompressionCodecLZ4 : public ICompressionCodec { public: - char getMethodByte() override; + UInt8 getMethodByte() const override; - void getCodecDesc(String & codec_desc) override; - - size_t compress(char * source, size_t source_size, char * dest) override; - - size_t getCompressedReserveSize(size_t uncompressed_size) override; - - size_t decompress(char * source, size_t source_size, char * dest, size_t decompressed_size) override; + String getCodecDesc() const override; private: - LZ4::PerformanceStatistics lz4_stat; + + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; + + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; + + UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override; + + mutable LZ4::PerformanceStatistics lz4_stat; }; -} \ No newline at end of file +} diff --git a/dbms/src/Compression/CompressionCodecMultiple.cpp b/dbms/src/Compression/CompressionCodecMultiple.cpp index 47ac6931435..d74a7129402 100644 --- a/dbms/src/Compression/CompressionCodecMultiple.cpp +++ b/dbms/src/Compression/CompressionCodecMultiple.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB @@ -25,100 +26,101 @@ CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs) codec_desc = codec_desc + ','; const auto codec = codecs[idx]; - String inner_codec_desc; - codec->getCodecDesc(inner_codec_desc); - codec_desc = codec_desc + inner_codec_desc; + codec_desc = codec_desc + codec->getCodecDesc(); } } -char CompressionCodecMultiple::getMethodByte() +UInt8 CompressionCodecMultiple::getMethodByte() const { - return static_cast(CompressionMethodByte::Multiple); + return static_cast(CompressionMethodByte::Multiple); } -void CompressionCodecMultiple::getCodecDesc(String & codec_desc_) +String CompressionCodecMultiple::getCodecDesc() const { - codec_desc_ = codec_desc; + return codec_desc; } -size_t CompressionCodecMultiple::getCompressedReserveSize(size_t uncompressed_size) +UInt32 CompressionCodecMultiple::getCompressedDataSize(UInt32 uncompressed_size) const { + UInt32 compressed_size = uncompressed_size; for (auto & codec : codecs) - uncompressed_size += codec->getCompressedReserveSize(uncompressed_size); + compressed_size = codec->getCompressedReserveSize(compressed_size); - /// MultipleCodecByte TotalCodecs ByteForEachCodec data - return sizeof(UInt8) + sizeof(UInt8) + codecs.size() + uncompressed_size; + /// TotalCodecs ByteForEachCodec data + return sizeof(UInt8) + codecs.size() + compressed_size; } -size_t CompressionCodecMultiple::compress(char * source, size_t source_size, char * dest) +UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 source_size, char * dest) const { - static constexpr size_t header_for_size_store = sizeof(UInt32) + sizeof(UInt32); PODArray compressed_buf; PODArray uncompressed_buf(source, source + source_size); - dest[0] = static_cast(getMethodByte()); - dest[1] = static_cast(codecs.size()); + dest[0] = static_cast(codecs.size()); - size_t codecs_byte_pos = 2; + size_t codecs_byte_pos = 1; for (size_t idx = 0; idx < codecs.size(); ++idx, ++codecs_byte_pos) { const auto codec = codecs[idx]; dest[codecs_byte_pos] = codec->getMethodByte(); - compressed_buf.resize(header_for_size_store + codec->getCompressedReserveSize(source_size)); + compressed_buf.resize(codec->getCompressedReserveSize(source_size)); - size_t size_compressed = header_for_size_store; - size_compressed += codec->compress(&uncompressed_buf[0], source_size, &compressed_buf[header_for_size_store]); - - UInt32 compressed_size_32 = size_compressed; - UInt32 uncompressed_size_32 = source_size; - unalignedStore(&compressed_buf[0], compressed_size_32); - unalignedStore(&compressed_buf[4], uncompressed_size_32); + UInt32 size_compressed = codec->compress(uncompressed_buf.data(), source_size, compressed_buf.data()); uncompressed_buf.swap(compressed_buf); source_size = size_compressed; } - memcpy(&dest[2 + codecs.size()], &uncompressed_buf[0], source_size); + //std::cerr << "(compress) BUF_SIZE_COMPRESSED:" << source_size << std::endl; - return 2 + codecs.size() + source_size; + memcpy(&dest[1 + codecs.size()], uncompressed_buf.data(), source_size); + + //std::cerr << "(compress) COMPRESSING BUF:\n"; + //for (size_t i = 0; i < source_size + 1 + codecs.size(); ++i) + // std::cerr << getHexUIntLowercase(+dest[i]) << " "; + //std::cerr << std::endl; + + return 1 + codecs.size() + source_size; } -size_t CompressionCodecMultiple::decompress(char * source, size_t source_size, char * dest, size_t decompressed_size) +void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const { + UInt8 compression_methods_size = source[0]; - static constexpr size_t header_for_size_store = sizeof(UInt32) + sizeof(UInt32); + //std::cerr << "(decompress) DECOMPRESSING BUF:\n"; + //for (size_t i = 0; i < source_size; ++i) + // std::cerr << getHexUIntLowercase(+source[i]) << " "; + //std::cerr << std::endl; - if (source[0] != getMethodByte()) - throw Exception("Incorrect compression method for codec multiple, given " + toString(source[0]) + ", expected " + toString(getMethodByte()), - ErrorCodes::UNKNOWN_CODEC); - - UInt8 compression_methods_size = source[1]; - PODArray compressed_buf; + //std::cerr << "(decompress) BUF_SIZE_COMPRESSED:" << source_size << std::endl; + //std::cerr << "(decompress) CODECS SIZE:" << +compression_methods_size << std::endl; + PODArray compressed_buf(&source[compression_methods_size + 1], &source[source_size]); PODArray uncompressed_buf; /// Insert all data into compressed buf - compressed_buf.insert(&source[compression_methods_size + 2], &source[source_size]); + source_size -= (compression_methods_size + 1); for (long idx = compression_methods_size - 1; idx >= 0; --idx) { - UInt8 compression_method = source[idx + 2]; + UInt8 compression_method = source[idx + 1]; const auto codec = CompressionCodecFactory::instance().get(compression_method); - UInt32 compressed_size = unalignedLoad(&compressed_buf[0]); - UInt32 uncompressed_size = unalignedLoad(&compressed_buf[4]); + UInt32 uncompressed_size = ICompressionCodec::readDecompressedBlockSize(compressed_buf.data()); + //std::cerr << "(decompress) UNCOMPRESSED SIZE READ:" << uncompressed_size << std::endl; + if (idx == 0 && uncompressed_size != decompressed_size) throw Exception("Wrong final decompressed size in codec Multiple, got " + toString(uncompressed_size) + ", expected " + toString(decompressed_size), ErrorCodes::CORRUPTED_DATA); + uncompressed_buf.resize(uncompressed_size); - codec->decompress(&compressed_buf[header_for_size_store], compressed_size - header_for_size_store, &uncompressed_buf[0], uncompressed_size); + codec->decompress(compressed_buf.data(), source_size, uncompressed_buf.data()); uncompressed_buf.swap(compressed_buf); + source_size = uncompressed_size; } memcpy(dest, compressed_buf.data(), decompressed_size); - return decompressed_size; } void registerCodecMultiple(CompressionCodecFactory & factory) { - factory.registerSimpleCompressionCodec("Multiple", static_cast(CompressionMethodByte::Multiple), [&](){ + factory.registerSimpleCompressionCodec("Multiple", static_cast(CompressionMethodByte::Multiple), [&](){ return std::make_shared(); }); } diff --git a/dbms/src/Compression/CompressionCodecMultiple.h b/dbms/src/Compression/CompressionCodecMultiple.h index cea37956ffb..f80f5bd7a6f 100644 --- a/dbms/src/Compression/CompressionCodecMultiple.h +++ b/dbms/src/Compression/CompressionCodecMultiple.h @@ -11,15 +11,16 @@ public: CompressionCodecMultiple() = default; explicit CompressionCodecMultiple(Codecs codecs); - char getMethodByte() override; + UInt8 getMethodByte() const override; - void getCodecDesc(String & codec_desc) override; + String getCodecDesc() const override; - size_t compress(char * source, size_t source_size, char * dest) override; + UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override; - size_t getCompressedReserveSize(size_t uncompressed_size) override; +protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; - size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override; + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; private: Codecs codecs; diff --git a/dbms/src/Compression/CompressionCodecNone.cpp b/dbms/src/Compression/CompressionCodecNone.cpp index 133d4d69c85..e090c09f9c1 100644 --- a/dbms/src/Compression/CompressionCodecNone.cpp +++ b/dbms/src/Compression/CompressionCodecNone.cpp @@ -6,26 +6,25 @@ namespace DB { -char CompressionCodecNone::getMethodByte() +UInt8 CompressionCodecNone::getMethodByte() const { - return static_cast(CompressionMethodByte::NONE); + return static_cast(CompressionMethodByte::NONE); } -void CompressionCodecNone::getCodecDesc(String & codec_desc) +String CompressionCodecNone::getCodecDesc() const { - codec_desc = "NONE"; + return "NONE"; } -size_t CompressionCodecNone::compress(char * source, size_t source_size, char * dest) +UInt32 CompressionCodecNone::doCompressData(const char * source, UInt32 source_size, char * dest) const { memcpy(dest, source, source_size); return source_size; } -size_t CompressionCodecNone::decompress(char * source, size_t /*source_size*/, char * dest, size_t size_decompressed) +void CompressionCodecNone::doDecompressData(const char * source, UInt32 /*source_size*/, char * dest, UInt32 uncompressed_size) const { - memcpy(dest, source, size_decompressed); - return size_decompressed; + memcpy(dest, source, uncompressed_size); } void registerCodecNone(CompressionCodecFactory & factory) diff --git a/dbms/src/Compression/CompressionCodecNone.h b/dbms/src/Compression/CompressionCodecNone.h index 89aeb218e10..ab3f1176734 100644 --- a/dbms/src/Compression/CompressionCodecNone.h +++ b/dbms/src/Compression/CompressionCodecNone.h @@ -11,13 +11,15 @@ namespace DB class CompressionCodecNone : public ICompressionCodec { public: - char getMethodByte() override; + UInt8 getMethodByte() const override; - void getCodecDesc(String & codec_desc) override; + String getCodecDesc() const override; - size_t compress(char * source, size_t source_size, char * compressed_buf) override; +protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; + + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; - size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override; }; -} \ No newline at end of file +} diff --git a/dbms/src/Compression/CompressionCodecZSTD.cpp b/dbms/src/Compression/CompressionCodecZSTD.cpp index cff70973d99..b42d2f54620 100644 --- a/dbms/src/Compression/CompressionCodecZSTD.cpp +++ b/dbms/src/Compression/CompressionCodecZSTD.cpp @@ -21,22 +21,23 @@ namespace ErrorCodes extern const int ILLEGAL_CODEC_PARAMETER; } -char CompressionCodecZSTD::getMethodByte() +UInt8 CompressionCodecZSTD::getMethodByte() const { - return static_cast(CompressionMethodByte::ZSTD); + return static_cast(CompressionMethodByte::ZSTD); } -void CompressionCodecZSTD::getCodecDesc(String & codec_desc) +String CompressionCodecZSTD::getCodecDesc() const { - codec_desc = "ZSTD"; + return "ZSTD"; } -size_t CompressionCodecZSTD::getCompressedReserveSize(size_t uncompressed_size) +UInt32 CompressionCodecZSTD::getCompressedDataSize(UInt32 uncompressed_size) const { return ZSTD_compressBound(uncompressed_size); } -size_t CompressionCodecZSTD::compress(char * source, size_t source_size, char * dest) + +UInt32 CompressionCodecZSTD::doCompressData(const char * source, UInt32 source_size, char * dest) const { size_t compressed_size = ZSTD_compress(dest, ZSTD_compressBound(source_size), source, source_size, level); @@ -46,18 +47,17 @@ size_t CompressionCodecZSTD::compress(char * source, size_t source_size, char * return compressed_size; } -size_t CompressionCodecZSTD::decompress(char * source, size_t source_size, char * dest, size_t size_decompressed) + +void CompressionCodecZSTD::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const { - size_t res = ZSTD_decompress(dest, size_decompressed, source, source_size); + size_t res = ZSTD_decompress(dest, uncompressed_size, source, source_size); if (ZSTD_isError(res)) throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS); - - return size_decompressed; } -CompressionCodecZSTD::CompressionCodecZSTD(int level) - :level(level) +CompressionCodecZSTD::CompressionCodecZSTD(int level_) + :level(level_) { } diff --git a/dbms/src/Compression/CompressionCodecZSTD.h b/dbms/src/Compression/CompressionCodecZSTD.h index 9f6e3cd7497..ea2c1a9b17a 100644 --- a/dbms/src/Compression/CompressionCodecZSTD.h +++ b/dbms/src/Compression/CompressionCodecZSTD.h @@ -11,20 +11,21 @@ namespace DB class CompressionCodecZSTD : public ICompressionCodec { public: - CompressionCodecZSTD(int level); + CompressionCodecZSTD(int level_); - char getMethodByte() override; + UInt8 getMethodByte() const override; - void getCodecDesc(String & codec_desc) override; + String getCodecDesc() const override; - size_t compress(char * source, size_t source_size, char * dest) override; + UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override; - size_t getCompressedReserveSize(size_t uncompressed_size) override; +protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; - size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override; + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; private: int level; }; -} \ No newline at end of file +} diff --git a/dbms/src/Compression/ICompressionCodec.cpp b/dbms/src/Compression/ICompressionCodec.cpp index 09b16b43b96..df1efacf0cc 100644 --- a/dbms/src/Compression/ICompressionCodec.cpp +++ b/dbms/src/Compression/ICompressionCodec.cpp @@ -28,97 +28,147 @@ namespace ErrorCodes extern const int SEEK_POSITION_OUT_OF_BOUND; } -CompressionCodecReadBufferPtr ICompressionCodec::liftCompressed(ReadBuffer & origin) +static constexpr auto CHECKSUM_SIZE{sizeof(CityHash_v1_0_2::uint128)}; + +UInt32 ICompressionCodec::compress(char * source, UInt32 source_size, char * dest) const { - return std::make_shared(origin); + dest[0] = getMethodByte(); + UInt8 header_size = getHeaderSize(); + /// Write data from header_size + UInt32 compressed_bytes_written = doCompressData(source, source_size, &dest[header_size]); + unalignedStore(&dest[1], compressed_bytes_written + header_size); + unalignedStore(&dest[5], source_size); + return header_size + compressed_bytes_written; } -CompressionCodecWriteBufferPtr ICompressionCodec::liftCompressed(WriteBuffer & origin) + +UInt32 ICompressionCodec::decompress(char * source, UInt32 source_size, char * dest) const { - return std::make_shared(*this, origin); + UInt8 method = source[0]; + if (method != getMethodByte()) + throw Exception("Can't decompress data with codec byte " + toString(method) + " from codec with byte " + toString(method), ErrorCodes::CANNOT_DECOMPRESS); + + UInt8 header_size = getHeaderSize(); + UInt32 decompressed_size = unalignedLoad(&source[5]); + doDecompressData(&source[header_size], source_size - header_size, dest, decompressed_size); + return decompressed_size; + } -CompressionCodecReadBuffer::CompressionCodecReadBuffer(ReadBuffer & origin) - : origin(origin) +UInt32 ICompressionCodec::readCompressedBlockSize(const char * source) +{ + return unalignedLoad(&source[1]); +} + + +UInt32 ICompressionCodec::readDecompressedBlockSize(const char * source) +{ + return unalignedLoad(&source[5]); +} + + +UInt8 ICompressionCodec::readMethod(const char * source) +{ + return static_cast(source[0]); +} + +CompressionCodecReadBufferPtr liftCompressed(CompressionCodecPtr codec, ReadBuffer & origin) +{ + return std::make_shared(codec, origin); +} + +CompressionCodecWriteBufferPtr liftCompressed(CompressionCodecPtr codec, WriteBuffer & origin) +{ + return std::make_shared(codec, origin); +} + +CompressionCodecReadBuffer::CompressionCodecReadBuffer(CompressionCodecPtr codec_, ReadBuffer & origin_) + : codec(codec_) + , origin(origin_) { } + /// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need. /// Returns number of compressed bytes read. -size_t CompressionCodecReadBuffer::readCompressedData(size_t & size_decompressed, size_t & size_compressed) +std::pair CompressionCodecReadBuffer::readCompressedData() { if (origin.eof()) - return 0; + return std::make_pair(0, 0); CityHash_v1_0_2::uint128 checksum; - origin.readStrict(reinterpret_cast(&checksum), sizeof(checksum)); + origin.readStrict(reinterpret_cast(&checksum), CHECKSUM_SIZE); - own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE); - origin.readStrict(own_compressed_buffer.data(), COMPRESSED_BLOCK_HEADER_SIZE); + UInt8 header_size = ICompressionCodec::getHeaderSize(); + own_compressed_buffer.resize(header_size); + origin.readStrict(own_compressed_buffer.data(), header_size); - size_compressed = unalignedLoad(&own_compressed_buffer[1]); - size_decompressed = unalignedLoad(&own_compressed_buffer[5]); + UInt8 method = ICompressionCodec::readMethod(own_compressed_buffer.data()); - if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) - throw Exception("Too large size_compressed: " + toString(size_compressed) + ". Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); + if (method != codec->getMethodByte()) + throw Exception("Can't decompress with method " + getHexUIntLowercase(method) + ", with codec " + getHexUIntLowercase(codec->getMethodByte()), ErrorCodes::CANNOT_DECOMPRESS); - ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum)); + UInt32 size_to_read_compressed = ICompressionCodec::readCompressedBlockSize(own_compressed_buffer.data()); + UInt32 size_decompressed = ICompressionCodec::readDecompressedBlockSize(own_compressed_buffer.data()); + + if (size_to_read_compressed > DBMS_MAX_COMPRESSED_SIZE) + throw Exception("Too large size_to_read_compressed: " + toString(size_to_read_compressed) + ". Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); + + ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_to_read_compressed + CHECKSUM_SIZE); /// Is whole compressed block located in 'origin' buffer? - if (origin.offset() >= COMPRESSED_BLOCK_HEADER_SIZE && - origin.position() + size_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER - COMPRESSED_BLOCK_HEADER_SIZE <= origin.buffer().end()) + if (origin.offset() >= header_size && + origin.position() + size_to_read_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER - header_size <= origin.buffer().end()) { - origin.position() -= COMPRESSED_BLOCK_HEADER_SIZE; + origin.position() -= header_size; compressed_buffer = origin.position(); - origin.position() += size_compressed; + origin.position() += size_to_read_compressed; } else { - own_compressed_buffer.resize(size_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER); + own_compressed_buffer.resize(size_to_read_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER); compressed_buffer = own_compressed_buffer.data(); - origin.readStrict(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed - COMPRESSED_BLOCK_HEADER_SIZE); + origin.readStrict(compressed_buffer + header_size, size_to_read_compressed - header_size); } - auto checksum_calculated = CityHash_v1_0_2::CityHash128(compressed_buffer, size_compressed); + auto checksum_calculated = CityHash_v1_0_2::CityHash128(compressed_buffer, size_to_read_compressed); if (checksum != checksum_calculated) throw Exception("Checksum doesn't match: corrupted data." " Reference: " + getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second) + ". Actual: " + getHexUIntLowercase(checksum_calculated.first) + getHexUIntLowercase(checksum_calculated.second) - + ". Size of compressed block: " + toString(size_compressed) + ".", + + ". Size of compressed block: " + toString(size_to_read_compressed), ErrorCodes::CHECKSUM_DOESNT_MATCH); - return size_compressed + sizeof(checksum); + return std::make_pair(size_to_read_compressed, size_decompressed); } -void CompressionCodecReadBuffer::decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum) +void CompressionCodecReadBuffer::decompress(char * to, UInt32 size_compressed) { - ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks); - ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed); + UInt8 method = ICompressionCodec::readMethod(compressed_buffer); - UInt8 current_method = compressed_buffer[0]; /// See CompressedWriteBuffer.h - if (!codec || current_method != method) - { - method = current_method; + if (!codec) codec = CompressionCodecFactory::instance().get(method); - } + else if (codec->getMethodByte() != method) + throw Exception("Can't decompress data with byte " + getHexUIntLowercase(method) + " expected byte " + getHexUIntLowercase(codec->getMethodByte()), ErrorCodes::CANNOT_DECOMPRESS); - codec->decompress(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, - size_compressed_without_checksum - COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed); + codec->decompress(compressed_buffer, size_compressed, to); } bool CompressionCodecReadBuffer::nextImpl() { - size_t size_decompressed; - size_t size_compressed_without_checksum; + UInt32 size_decompressed; - size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum); - if (!size_compressed) + std::tie(read_compressed_bytes_for_last_time, size_decompressed) = readCompressedData(); + if (!read_compressed_bytes_for_last_time) return false; memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER); working_buffer = Buffer(memory.data(), &memory[size_decompressed]); - decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum); + decompress(working_buffer.begin(), read_compressed_bytes_for_last_time); + + ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed); + ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks); return true; } @@ -127,16 +177,19 @@ void CompressionCodecReadBuffer::seek(size_t offset_in_compressed_file, size_t o { if (const auto file_in = dynamic_cast(&origin)) { - if (size_compressed && - offset_in_compressed_file == file_in->getPositionInFile() - size_compressed && - offset_in_decompressed_block <= working_buffer.size()) + UInt32 readed_size_with_checksum = read_compressed_bytes_for_last_time + CHECKSUM_SIZE; + UInt32 last_readed_block_start_pos = file_in->getPositionInFile() - readed_size_with_checksum; + /// We seek in already uncompressed block + if (readed_size_with_checksum && /// we already have read something + offset_in_compressed_file == last_readed_block_start_pos && /// our position is exactly at required byte + offset_in_decompressed_block <= working_buffer.size()) /// our buffer size is more, than required position in uncompressed block { bytes += offset(); pos = working_buffer.begin() + offset_in_decompressed_block; /// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right. bytes -= offset(); } - else + else /// or we have to read and uncompress further { file_in->seek(offset_in_compressed_file); @@ -173,26 +226,18 @@ void CompressionCodecWriteBuffer::nextImpl() if (!offset()) return; - static constexpr size_t header_size = 1 + sizeof(UInt32) + sizeof(UInt32); + size_t decompressed_size = offset(); + UInt32 compressed_reserve_size = codec->getCompressedReserveSize(decompressed_size); + compressed_buffer.resize(compressed_reserve_size); + UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data()); - size_t uncompressed_size = offset(); - size_t compressed_reserve_size = compression_codec.getCompressedReserveSize(uncompressed_size); - - compressed_buffer.resize(header_size + compressed_reserve_size); - compressed_buffer[0] = compression_codec.getMethodByte(); - size_t compressed_size = header_size + compression_codec.compress(working_buffer.begin(), uncompressed_size, &compressed_buffer[header_size]); - - UInt32 compressed_size_32 = compressed_size; - UInt32 uncompressed_size_32 = uncompressed_size; - unalignedStore(&compressed_buffer[1], compressed_size_32); - unalignedStore(&compressed_buffer[5], uncompressed_size_32); CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(compressed_buffer.data(), compressed_size); - out.write(reinterpret_cast(&checksum), sizeof(checksum)); + out.write(reinterpret_cast(&checksum), CHECKSUM_SIZE); out.write(compressed_buffer.data(), compressed_size); } -CompressionCodecWriteBuffer::CompressionCodecWriteBuffer(ICompressionCodec & compression_codec, WriteBuffer & out, size_t buf_size) - : BufferWithOwnMemory(buf_size), out(out), compression_codec(compression_codec) +CompressionCodecWriteBuffer::CompressionCodecWriteBuffer(CompressionCodecPtr codec_, WriteBuffer & out_, size_t buf_size) + : BufferWithOwnMemory(buf_size), out(out_), codec(codec_) { } diff --git a/dbms/src/Compression/ICompressionCodec.h b/dbms/src/Compression/ICompressionCodec.h index 9e818ef969a..9a2babf527f 100644 --- a/dbms/src/Compression/ICompressionCodec.h +++ b/dbms/src/Compression/ICompressionCodec.h @@ -28,44 +28,45 @@ using CompressionCodecWriteBufferPtr = std::shared_ptr { public: - CompressionCodecWriteBuffer(ICompressionCodec & compression_codec, WriteBuffer & out, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); + CompressionCodecWriteBuffer(CompressionCodecPtr codec_, WriteBuffer & out_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); ~CompressionCodecWriteBuffer() override; private: void nextImpl() override; - private: WriteBuffer & out; - ICompressionCodec & compression_codec; + CompressionCodecPtr codec; PODArray compressed_buffer; }; class CompressionCodecReadBuffer : public BufferWithOwnMemory { + + UInt32 read_compressed_bytes_for_last_time = 0; + public: - size_t size_compressed = 0; - size_t size_decompressed = 0; + std::pair readCompressedData(); - CompressionCodecReadBuffer(ReadBuffer & origin); + void decompress(char * to, UInt32 size_compressed); - size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed); - - void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum); + CompressionCodecReadBuffer(CompressionCodecPtr codec_, ReadBuffer & origin_); void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); - private: + CompressionCodecPtr codec; ReadBuffer & origin; char * compressed_buffer; - UInt8 method; - CompressionCodecPtr codec; PODArray own_compressed_buffer; bool nextImpl() override; }; +CompressionCodecReadBufferPtr liftCompressed(CompressionCodecPtr codec, ReadBuffer & origin); + +CompressionCodecWriteBufferPtr liftCompressed(CompressionCodecPtr codec, WriteBuffer & origin); + /** * */ @@ -74,20 +75,31 @@ class ICompressionCodec : private boost::noncopyable public: virtual ~ICompressionCodec() = default; - CompressionCodecReadBufferPtr liftCompressed(ReadBuffer & origin); + virtual UInt8 getMethodByte() const = 0; - CompressionCodecWriteBufferPtr liftCompressed(WriteBuffer & origin); + virtual String getCodecDesc() const = 0; - virtual char getMethodByte() = 0; + virtual UInt32 compress(char * source, UInt32 source_size, char * dest) const; - /// TODO(alesap) FIXME - virtual void getCodecDesc(String & codec_desc) = 0; + virtual UInt32 decompress(char * source, UInt32 source_size, char * dest) const; - virtual size_t compress(char * source, size_t source_size, char * dest) = 0; + virtual UInt32 getCompressedReserveSize(UInt32 uncompressed_size) const { return getHeaderSize() + getCompressedDataSize(uncompressed_size); } - virtual size_t decompress(char * source, size_t source_size, char * dest, size_t decompressed_size) = 0; + static UInt8 getHeaderSize() { return 1 + 8; } - virtual size_t getCompressedReserveSize(size_t uncompressed_size) { return uncompressed_size; } + static UInt32 readCompressedBlockSize(const char * source); + + static UInt32 readDecompressedBlockSize(const char * source); + + static UInt8 readMethod(const char * source); + +protected: + + virtual UInt32 getCompressedDataSize(UInt32 uncompressed_size) const { return uncompressed_size; } + + virtual UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const = 0; + + virtual void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const = 0; }; } diff --git a/dbms/src/IO/CachedCompressedReadBuffer.cpp b/dbms/src/IO/CachedCompressedReadBuffer.cpp index bc36ed38afd..3eb74bb070d 100644 --- a/dbms/src/IO/CachedCompressedReadBuffer.cpp +++ b/dbms/src/IO/CachedCompressedReadBuffer.cpp @@ -20,7 +20,7 @@ void CachedCompressedReadBuffer::initInput() if (!file_in) { file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size); - in = codec->liftCompressed(*file_in); + in = liftCompressed(codec, *file_in); if (profile_callback) file_in->setProfileCallback(profile_callback, clock_type); @@ -42,17 +42,16 @@ bool CachedCompressedReadBuffer::nextImpl() owned_cell = std::make_shared(); - size_t size_decompressed; - size_t size_compressed_without_checksum; - owned_cell->compressed_size = in->readCompressedData(size_decompressed, size_compressed_without_checksum); + UInt32 size_decompressed; + std::tie(owned_cell->compressed_size, size_decompressed) = in->readCompressedData(); if (owned_cell->compressed_size) { owned_cell->data.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER); - in->decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum); + in->decompress(owned_cell->data.data(), owned_cell->compressed_size); in->buffer() = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER); - in->decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum); + in->decompress(owned_cell->data.data(), owned_cell->compressed_size); /// Put data into cache. cache->set(key, owned_cell); @@ -74,9 +73,9 @@ bool CachedCompressedReadBuffer::nextImpl() CachedCompressedReadBuffer::CachedCompressedReadBuffer( - const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec, + const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec_, size_t estimated_size_, size_t aio_threshold_, size_t buf_size_) - : ReadBuffer(nullptr, 0), path(path_), cache(cache_), codec(codec), buf_size(buf_size_), estimated_size(estimated_size_), + : ReadBuffer(nullptr, 0), path(path_), cache(cache_), codec(codec_), buf_size(buf_size_), estimated_size(estimated_size_), aio_threshold(aio_threshold_), file_pos(0) { } diff --git a/dbms/src/IO/CachedCompressedReadBuffer.h b/dbms/src/IO/CachedCompressedReadBuffer.h index 2aaaaa21a1c..f0b2b725b10 100644 --- a/dbms/src/IO/CachedCompressedReadBuffer.h +++ b/dbms/src/IO/CachedCompressedReadBuffer.h @@ -45,7 +45,7 @@ private: public: CachedCompressedReadBuffer( - const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec, + const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec_, size_t estimated_size_, size_t aio_threshold_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 2b27aa11b7b..1f90d8048c0 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -371,8 +371,7 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns) const auto ct = columns.codecs.find(column.name); if (ct != std::end(columns.codecs)) { - String codec_desc; - ct->second->getCodecDesc(codec_desc); + String codec_desc = ct->second->getCodecDesc(); codec_desc = "CODEC(" + codec_desc + ")"; auto pos = codec_desc.data(); const auto end = pos + codec_desc.size(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index 1b7b3c75982..39e3bb3c725 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -252,7 +252,7 @@ MergeTreeReader::Stream::Stream( if (profile_callback) file_in->setProfileCallback(profile_callback, clock_type); - const auto compressed_buffer = codec->liftCompressed(*file_in); + const auto compressed_buffer = liftCompressed(codec, *file_in); non_cached_buffer = compressed_buffer; data_buffer = non_cached_buffer.get(); } diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 5bf356201ee..6d2d5c132b5 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -192,7 +192,7 @@ IMergedBlockOutputStream::ColumnStream::ColumnStream( data_file_extension{data_file_extension_}, marks_file_extension{marks_file_extension_}, plain_file(createWriteBufferFromFileBase(data_path + data_file_extension, estimated_size, aio_threshold, max_compress_block_size)), - plain_hashing(*plain_file), compressed_buf(compression_codec->liftCompressed(plain_hashing)), compressed(*compressed_buf.get()), + plain_hashing(*plain_file), compressed_buf(liftCompressed(compression_codec, plain_hashing)), compressed(*compressed_buf.get()), marks_file(marks_path + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file) { } diff --git a/dbms/tests/queries/0_stateless/00804_test_custom_compression_codecs.reference b/dbms/tests/queries/0_stateless/00804_test_custom_compression_codecs.reference index 4832f7f06b6..27037ff5641 100644 --- a/dbms/tests/queries/0_stateless/00804_test_custom_compression_codecs.reference +++ b/dbms/tests/queries/0_stateless/00804_test_custom_compression_codecs.reference @@ -11,3 +11,5 @@ 9175437371954010821 1.5555555555555 hello world! [77] ['John'] 7.1000000000000 xxxxxxxxxxxx [127] ['Henry'] +! +222 diff --git a/dbms/tests/queries/0_stateless/00804_test_custom_compression_codecs.sql b/dbms/tests/queries/0_stateless/00804_test_custom_compression_codecs.sql index e1860a8702c..c7d927c30b3 100644 --- a/dbms/tests/queries/0_stateless/00804_test_custom_compression_codecs.sql +++ b/dbms/tests/queries/0_stateless/00804_test_custom_compression_codecs.sql @@ -49,19 +49,19 @@ INSERT INTO test.compression_codec_multiple VALUES (1, 'world', toDate('2018-10- SELECT * FROM test.compression_codec_multiple ORDER BY id; -INSERT INTO test.compression_codec_multiple select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number from system.numbers limit 10000; +INSERT INTO test.compression_codec_multiple select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000; -SELECT count(*) from test.compression_codec_multiple; +SELECT count(*) FROM test.compression_codec_multiple; -SELECT count(distinct data) from test.compression_codec_multiple; +SELECT count(distinct data) FROM test.compression_codec_multiple; -SELECT floor(sum(somenum), 1) from test.compression_codec_multiple; +SELECT floor(sum(somenum), 1) FROM test.compression_codec_multiple; TRUNCATE TABLE test.compression_codec_multiple; -INSERT INTO test.compression_codec_multiple select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number from system.numbers limit 10000; +INSERT INTO test.compression_codec_multiple select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000; -SELECT sum(cityHash64(*)) from test.compression_codec_multiple; +SELECT sum(cityHash64(*)) FROM test.compression_codec_multiple; DROP TABLE IF EXISTS test.compression_codec_multiple_more_types; @@ -75,3 +75,22 @@ INSERT INTO test.compression_codec_multiple_more_types VALUES(1.5555555555555, ' INSERT INTO test.compression_codec_multiple_more_types VALUES(7.1, 'xxxxxxxxxxxx', [127], ['Henry']); SELECT * FROM test.compression_codec_multiple_more_types order by id; + +DROP TABLE IF EXISTS test.compression_codec_multiple_with_key; + +CREATE TABLE test.compression_codec_multiple_with_key ( + somedate Date CODEC(ZSTD, ZSTD, ZSTD(12)), + id UInt64 CODEC(LZ4, ZSTD, NONE), + data String CODEC(ZSTD(2), NONE, LZ4, LZ4) +) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity = 2; + + +INSERT INTO test.compression_codec_multiple_with_key VALUES(toDate('2018-10-12'), 100000, 'hello'), (toDate('2018-10-12'), 100002, 'world'), (toDate('2018-10-12'), 1111, '!'); + +SELECT data FROM test.compression_codec_multiple_with_key WHERE id BETWEEN 3 AND 1112; + +INSERT INTO test.compression_codec_multiple_with_key SELECT toDate('2018-10-12'), number, toString(number) FROM system.numbers LIMIT 1000; + +SELECT COUNT(DISTINCT data) FROM test.compression_codec_multiple_with_key WHERE id < 222; + +DROP TABLE IF EXISTS test.compression_codec_multiple_with_key;