From d5da58918e8245652269381df4e70eed346be2b8 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 4 Sep 2020 01:04:46 +0300 Subject: [PATCH] create less compressed streams while writing compact parts --- src/Compression/CompressionCodecDelta.cpp | 6 ++++ src/Compression/CompressionCodecDelta.h | 3 ++ .../CompressionCodecDoubleDelta.cpp | 6 ++++ src/Compression/CompressionCodecDoubleDelta.h | 3 ++ src/Compression/CompressionCodecGorilla.cpp | 6 ++++ src/Compression/CompressionCodecGorilla.h | 3 ++ src/Compression/CompressionCodecLZ4.cpp | 5 ++++ src/Compression/CompressionCodecLZ4.h | 2 ++ src/Compression/CompressionCodecMultiple.cpp | 6 ++++ src/Compression/CompressionCodecMultiple.h | 3 ++ src/Compression/CompressionCodecNone.cpp | 5 ++++ src/Compression/CompressionCodecNone.h | 3 ++ src/Compression/CompressionCodecT64.cpp | 7 +++++ src/Compression/CompressionCodecT64.h | 2 ++ src/Compression/CompressionCodecZSTD.cpp | 5 ++++ src/Compression/CompressionCodecZSTD.h | 3 ++ src/Compression/ICompressionCodec.cpp | 7 +++++ src/Compression/ICompressionCodec.h | 5 ++++ .../MergeTreeDataPartWriterCompact.cpp | 29 ++++++++++++++----- .../MergeTreeDataPartWriterCompact.h | 16 +++++----- 20 files changed, 110 insertions(+), 15 deletions(-) diff --git a/src/Compression/CompressionCodecDelta.cpp b/src/Compression/CompressionCodecDelta.cpp index 51bd19f646b..dc866e527d6 100644 --- a/src/Compression/CompressionCodecDelta.cpp +++ b/src/Compression/CompressionCodecDelta.cpp @@ -36,6 +36,12 @@ ASTPtr CompressionCodecDelta::getCodecDesc() const return makeASTFunction("Delta", literal); } +void CompressionCodecDelta::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); + hash.update(delta_bytes_size); +} + namespace { diff --git a/src/Compression/CompressionCodecDelta.h b/src/Compression/CompressionCodecDelta.h index 5c3979e063e..a192fab051a 100644 --- a/src/Compression/CompressionCodecDelta.h +++ b/src/Compression/CompressionCodecDelta.h @@ -14,7 +14,10 @@ public: ASTPtr getCodecDesc() const override; + void updateHash(SipHash & hash) const override; + protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; diff --git a/src/Compression/CompressionCodecDoubleDelta.cpp b/src/Compression/CompressionCodecDoubleDelta.cpp index 157e2df1a3f..dd2e95a916d 100644 --- a/src/Compression/CompressionCodecDoubleDelta.cpp +++ b/src/Compression/CompressionCodecDoubleDelta.cpp @@ -339,6 +339,12 @@ ASTPtr CompressionCodecDoubleDelta::getCodecDesc() const return std::make_shared("DoubleDelta"); } +void CompressionCodecDoubleDelta::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); + hash.update(data_bytes_size); +} + UInt32 CompressionCodecDoubleDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const { const auto result = 2 // common header diff --git a/src/Compression/CompressionCodecDoubleDelta.h b/src/Compression/CompressionCodecDoubleDelta.h index a2690d24414..30ef086077d 100644 --- a/src/Compression/CompressionCodecDoubleDelta.h +++ b/src/Compression/CompressionCodecDoubleDelta.h @@ -100,7 +100,10 @@ public: ASTPtr getCodecDesc() const override; + void updateHash(SipHash & hash) const override; + protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; diff --git a/src/Compression/CompressionCodecGorilla.cpp b/src/Compression/CompressionCodecGorilla.cpp index 042835f4a32..3d08734fe91 100644 --- a/src/Compression/CompressionCodecGorilla.cpp +++ b/src/Compression/CompressionCodecGorilla.cpp @@ -254,6 +254,12 @@ ASTPtr CompressionCodecGorilla::getCodecDesc() const return std::make_shared("Gorilla"); } +void CompressionCodecGorilla::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); + hash.update(data_bytes_size); +} + UInt32 CompressionCodecGorilla::getMaxCompressedDataSize(UInt32 uncompressed_size) const { const auto result = 2 // common header diff --git a/src/Compression/CompressionCodecGorilla.h b/src/Compression/CompressionCodecGorilla.h index 523add0700f..df0f329dc31 100644 --- a/src/Compression/CompressionCodecGorilla.h +++ b/src/Compression/CompressionCodecGorilla.h @@ -97,7 +97,10 @@ public: ASTPtr getCodecDesc() const override; + void updateHash(SipHash & hash) const override; + protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; diff --git a/src/Compression/CompressionCodecLZ4.cpp b/src/Compression/CompressionCodecLZ4.cpp index cf3622cd702..1370349d68d 100644 --- a/src/Compression/CompressionCodecLZ4.cpp +++ b/src/Compression/CompressionCodecLZ4.cpp @@ -35,6 +35,11 @@ ASTPtr CompressionCodecLZ4::getCodecDesc() const return std::make_shared("LZ4"); } +void CompressionCodecLZ4::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); +} + UInt32 CompressionCodecLZ4::getMaxCompressedDataSize(UInt32 uncompressed_size) const { return LZ4_COMPRESSBOUND(uncompressed_size); diff --git a/src/Compression/CompressionCodecLZ4.h b/src/Compression/CompressionCodecLZ4.h index 2f19af08185..229e25481e6 100644 --- a/src/Compression/CompressionCodecLZ4.h +++ b/src/Compression/CompressionCodecLZ4.h @@ -18,6 +18,8 @@ public: UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; } + void updateHash(SipHash & hash) const override; + protected: UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; diff --git a/src/Compression/CompressionCodecMultiple.cpp b/src/Compression/CompressionCodecMultiple.cpp index 868df90825e..77f0fc132fe 100644 --- a/src/Compression/CompressionCodecMultiple.cpp +++ b/src/Compression/CompressionCodecMultiple.cpp @@ -37,6 +37,12 @@ ASTPtr CompressionCodecMultiple::getCodecDesc() const return result; } +void CompressionCodecMultiple::updateHash(SipHash & hash) const +{ + for (const auto & codec : codecs) + codec->updateHash(hash); +} + UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const { UInt32 compressed_size = uncompressed_size; diff --git a/src/Compression/CompressionCodecMultiple.h b/src/Compression/CompressionCodecMultiple.h index cd50d3250e3..6bac189bdf7 100644 --- a/src/Compression/CompressionCodecMultiple.h +++ b/src/Compression/CompressionCodecMultiple.h @@ -19,7 +19,10 @@ public: static std::vector getCodecsBytesFromData(const char * source); + void updateHash(SipHash & hash) const override; + protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const override; diff --git a/src/Compression/CompressionCodecNone.cpp b/src/Compression/CompressionCodecNone.cpp index 50c19b2b547..f727c4b4860 100644 --- a/src/Compression/CompressionCodecNone.cpp +++ b/src/Compression/CompressionCodecNone.cpp @@ -17,6 +17,11 @@ ASTPtr CompressionCodecNone::getCodecDesc() const return std::make_shared("NONE"); } +void CompressionCodecNone::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); +} + UInt32 CompressionCodecNone::doCompressData(const char * source, UInt32 source_size, char * dest) const { memcpy(dest, source, source_size); diff --git a/src/Compression/CompressionCodecNone.h b/src/Compression/CompressionCodecNone.h index ed604063198..370ef301694 100644 --- a/src/Compression/CompressionCodecNone.h +++ b/src/Compression/CompressionCodecNone.h @@ -15,7 +15,10 @@ public: ASTPtr getCodecDesc() const override; + void updateHash(SipHash & hash) const override; + protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; diff --git a/src/Compression/CompressionCodecT64.cpp b/src/Compression/CompressionCodecT64.cpp index 16462e50ebd..30972a5fe1f 100644 --- a/src/Compression/CompressionCodecT64.cpp +++ b/src/Compression/CompressionCodecT64.cpp @@ -646,6 +646,13 @@ ASTPtr CompressionCodecT64::getCodecDesc() const return makeASTFunction("T64", literal); } +void CompressionCodecT64::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); + hash.update(type_idx); + hash.update(variant); +} + void registerCodecT64(CompressionCodecFactory & factory) { auto reg_func = [&](const ASTPtr & arguments, DataTypePtr type) -> CompressionCodecPtr diff --git a/src/Compression/CompressionCodecT64.h b/src/Compression/CompressionCodecT64.h index 11efbea0955..9671eb81ce1 100644 --- a/src/Compression/CompressionCodecT64.h +++ b/src/Compression/CompressionCodecT64.h @@ -35,6 +35,8 @@ public: ASTPtr getCodecDesc() const override; + void updateHash(SipHash & hash) const override; + protected: UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override; void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override; diff --git a/src/Compression/CompressionCodecZSTD.cpp b/src/Compression/CompressionCodecZSTD.cpp index ab48580533e..3b317884fec 100644 --- a/src/Compression/CompressionCodecZSTD.cpp +++ b/src/Compression/CompressionCodecZSTD.cpp @@ -32,6 +32,11 @@ ASTPtr CompressionCodecZSTD::getCodecDesc() const return makeASTFunction("ZSTD", literal); } +void CompressionCodecZSTD::updateHash(SipHash & hash) const +{ + getCodecDesc()->updateTreeHash(hash); +} + UInt32 CompressionCodecZSTD::getMaxCompressedDataSize(UInt32 uncompressed_size) const { return ZSTD_compressBound(uncompressed_size); diff --git a/src/Compression/CompressionCodecZSTD.h b/src/Compression/CompressionCodecZSTD.h index 2ad893083c3..3bfb6bb1d4d 100644 --- a/src/Compression/CompressionCodecZSTD.h +++ b/src/Compression/CompressionCodecZSTD.h @@ -21,7 +21,10 @@ public: UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; + void updateHash(SipHash & hash) const override; + protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; diff --git a/src/Compression/ICompressionCodec.cpp b/src/Compression/ICompressionCodec.cpp index 4aafc298658..5de015b2680 100644 --- a/src/Compression/ICompressionCodec.cpp +++ b/src/Compression/ICompressionCodec.cpp @@ -35,6 +35,13 @@ ASTPtr ICompressionCodec::getFullCodecDesc() const return result; } +UInt64 ICompressionCodec::getHash() const +{ + SipHash hash; + updateHash(hash); + return hash.get64(); +} + UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char * dest) const { assert(source != nullptr && dest != nullptr); diff --git a/src/Compression/ICompressionCodec.h b/src/Compression/ICompressionCodec.h index fa1f73ce4dd..8f72ba55200 100644 --- a/src/Compression/ICompressionCodec.h +++ b/src/Compression/ICompressionCodec.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -36,6 +37,10 @@ public: /// "CODEC(LZ4,LZ4HC(5))" ASTPtr getFullCodecDesc() const; + /// Hash, that depends on codec ast and optional parameters like data type + virtual void updateHash(SipHash & hash) const = 0; + UInt64 getHash() const; + /// Compressed bytes from uncompressed source to dest. Dest should preallocate memory UInt32 compress(const char * source, UInt32 source_size, char * dest) const; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 5e311e0a2f9..70beaec5e5e 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -29,9 +29,18 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( , marks(*marks_file) { const auto & storage_columns = metadata_snapshot->getColumns(); + + /// Create compressed stream for every different codec. + std::unordered_map streams_by_codec; for (const auto & column : columns_list) - compressed_streams[column.name] = std::make_unique( - plain_hashing, storage_columns.getCodecOrDefault(column.name, default_codec)); + { + auto codec = storage_columns.getCodecOrDefault(column.name, default_codec); + auto & stream = streams_by_codec[codec->getHash()]; + if (!stream) + stream = std::make_shared(plain_hashing, codec); + + compressed_streams.push_back(stream); + } } void MergeTreeDataPartWriterCompact::write( @@ -101,14 +110,15 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block) if (rows_to_write) data_written = true; - for (const auto & column : columns_list) + auto name_and_type = columns_list.begin(); + for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type) { - auto & stream = compressed_streams[column.name]; + auto & stream = compressed_streams[i]; writeIntBinary(plain_hashing.count(), marks); writeIntBinary(stream->hashing_buf.offset(), marks); - writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write); + writeColumnSingleGranule(block.getByName(name_and_type->name), stream, current_row, rows_to_write); /// Write one compressed block per column in granule for more optimal reading. stream->hashing_buf.next(); @@ -133,12 +143,15 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block) next_mark = from_mark; } -void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows) const +void MergeTreeDataPartWriterCompact::writeColumnSingleGranule( + const ColumnWithTypeAndName & column, + const CompressedStreamPtr & stream, + size_t from_row, size_t number_of_rows) const { IDataType::SerializeBinaryBulkStatePtr state; IDataType::SerializeBinaryBulkSettings serialize_settings; - serialize_settings.getter = [this, &column](IDataType::SubstreamPath) -> WriteBuffer * { return &compressed_streams.at(column.name)->hashing_buf; }; + serialize_settings.getter = [&stream](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->hashing_buf; }; serialize_settings.position_independent_encoding = true; serialize_settings.low_cardinality_max_dictionary_size = 0; @@ -213,7 +226,7 @@ void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums & size_t uncompressed_size = 0; CityHash_v1_0_2::uint128 uncompressed_hash{0, 0}; - for (const auto & [_, stream] : compressed_streams) + for (const auto & stream : compressed_streams) { uncompressed_size += stream->hashing_buf.count(); auto stream_hash = stream->hashing_buf.getHash(); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index 6206d6e867e..2f24c515fb3 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -26,12 +26,6 @@ protected: void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override; private: - /// Write single granule of one column (rows between 2 marks) - void writeColumnSingleGranule( - const ColumnWithTypeAndName & column, - size_t from_row, - size_t number_of_rows) const; - void writeBlock(const Block & block); void addToChecksums(MergeTreeDataPartChecksums & checksumns); @@ -67,11 +61,19 @@ private: : compressed_buf(buf, codec), hashing_buf(compressed_buf) {} }; - std::unordered_map> compressed_streams; + using CompressedStreamPtr = std::shared_ptr; + std::vector compressed_streams; /// marks -> marks_file std::unique_ptr marks_file; HashingWriteBuffer marks; + + /// Write single granule of one column (rows between 2 marks) + void writeColumnSingleGranule( + const ColumnWithTypeAndName & column, + const CompressedStreamPtr & stream, + size_t from_row, + size_t number_of_rows) const; }; }