create less compressed streams while writing compact parts

This commit is contained in:
Anton Popov 2020-09-04 01:04:46 +03:00
parent 98011f6fc3
commit d5da58918e
20 changed files with 110 additions and 15 deletions

View File

@ -36,6 +36,12 @@ ASTPtr CompressionCodecDelta::getCodecDesc() const
return makeASTFunction("Delta", literal);
}
void CompressionCodecDelta::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(delta_bytes_size);
}
namespace
{

View File

@ -14,7 +14,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -339,6 +339,12 @@ ASTPtr CompressionCodecDoubleDelta::getCodecDesc() const
return std::make_shared<ASTIdentifier>("DoubleDelta");
}
void CompressionCodecDoubleDelta::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(data_bytes_size);
}
UInt32 CompressionCodecDoubleDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
const auto result = 2 // common header

View File

@ -100,7 +100,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -254,6 +254,12 @@ ASTPtr CompressionCodecGorilla::getCodecDesc() const
return std::make_shared<ASTIdentifier>("Gorilla");
}
void CompressionCodecGorilla::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(data_bytes_size);
}
UInt32 CompressionCodecGorilla::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
const auto result = 2 // common header

View File

@ -97,7 +97,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -35,6 +35,11 @@ ASTPtr CompressionCodecLZ4::getCodecDesc() const
return std::make_shared<ASTIdentifier>("LZ4");
}
void CompressionCodecLZ4::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecLZ4::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return LZ4_COMPRESSBOUND(uncompressed_size);

View File

@ -18,6 +18,8 @@ public:
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;

View File

@ -37,6 +37,12 @@ ASTPtr CompressionCodecMultiple::getCodecDesc() const
return result;
}
void CompressionCodecMultiple::updateHash(SipHash & hash) const
{
for (const auto & codec : codecs)
codec->updateHash(hash);
}
UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
UInt32 compressed_size = uncompressed_size;

View File

@ -19,7 +19,10 @@ public:
static std::vector<uint8_t> getCodecsBytesFromData(const char * source);
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const override;

View File

@ -17,6 +17,11 @@ ASTPtr CompressionCodecNone::getCodecDesc() const
return std::make_shared<ASTIdentifier>("NONE");
}
void CompressionCodecNone::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecNone::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
memcpy(dest, source, source_size);

View File

@ -15,7 +15,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -646,6 +646,13 @@ ASTPtr CompressionCodecT64::getCodecDesc() const
return makeASTFunction("T64", literal);
}
void CompressionCodecT64::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(type_idx);
hash.update(variant);
}
void registerCodecT64(CompressionCodecFactory & factory)
{
auto reg_func = [&](const ASTPtr & arguments, DataTypePtr type) -> CompressionCodecPtr

View File

@ -35,6 +35,8 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;

View File

@ -32,6 +32,11 @@ ASTPtr CompressionCodecZSTD::getCodecDesc() const
return makeASTFunction("ZSTD", literal);
}
void CompressionCodecZSTD::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecZSTD::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return ZSTD_compressBound(uncompressed_size);

View File

@ -21,7 +21,10 @@ public:
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -35,6 +35,13 @@ ASTPtr ICompressionCodec::getFullCodecDesc() const
return result;
}
UInt64 ICompressionCodec::getHash() const
{
SipHash hash;
updateHash(hash);
return hash.get64();
}
UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char * dest) const
{
assert(source != nullptr && dest != nullptr);

View File

@ -5,6 +5,7 @@
#include <Compression/CompressionInfo.h>
#include <Core/Types.h>
#include <Parsers/IAST.h>
#include <Common/SipHash.h>
namespace DB
@ -36,6 +37,10 @@ public:
/// "CODEC(LZ4,LZ4HC(5))"
ASTPtr getFullCodecDesc() const;
/// Hash, that depends on codec ast and optional parameters like data type
virtual void updateHash(SipHash & hash) const = 0;
UInt64 getHash() const;
/// Compressed bytes from uncompressed source to dest. Dest should preallocate memory
UInt32 compress(const char * source, UInt32 source_size, char * dest) const;

View File

@ -29,9 +29,18 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
, marks(*marks_file)
{
const auto & storage_columns = metadata_snapshot->getColumns();
/// Create compressed stream for every different codec.
std::unordered_map<UInt64, CompressedStreamPtr> streams_by_codec;
for (const auto & column : columns_list)
compressed_streams[column.name] = std::make_unique<CompressedStream>(
plain_hashing, storage_columns.getCodecOrDefault(column.name, default_codec));
{
auto codec = storage_columns.getCodecOrDefault(column.name, default_codec);
auto & stream = streams_by_codec[codec->getHash()];
if (!stream)
stream = std::make_shared<CompressedStream>(plain_hashing, codec);
compressed_streams.push_back(stream);
}
}
void MergeTreeDataPartWriterCompact::write(
@ -101,14 +110,15 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
if (rows_to_write)
data_written = true;
for (const auto & column : columns_list)
auto name_and_type = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type)
{
auto & stream = compressed_streams[column.name];
auto & stream = compressed_streams[i];
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(stream->hashing_buf.offset(), marks);
writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream, current_row, rows_to_write);
/// Write one compressed block per column in granule for more optimal reading.
stream->hashing_buf.next();
@ -133,12 +143,15 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
next_mark = from_mark;
}
void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows) const
void MergeTreeDataPartWriterCompact::writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const CompressedStreamPtr & stream,
size_t from_row, size_t number_of_rows) const
{
IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = [this, &column](IDataType::SubstreamPath) -> WriteBuffer * { return &compressed_streams.at(column.name)->hashing_buf; };
serialize_settings.getter = [&stream](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->hashing_buf; };
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;
@ -213,7 +226,7 @@ void MergeTreeDataPartWriterCompact::addToChecksums(MergeTreeDataPartChecksums &
size_t uncompressed_size = 0;
CityHash_v1_0_2::uint128 uncompressed_hash{0, 0};
for (const auto & [_, stream] : compressed_streams)
for (const auto & stream : compressed_streams)
{
uncompressed_size += stream->hashing_buf.count();
auto stream_hash = stream->hashing_buf.getHash();

View File

@ -26,12 +26,6 @@ protected:
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
private:
/// Write single granule of one column (rows between 2 marks)
void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
size_t from_row,
size_t number_of_rows) const;
void writeBlock(const Block & block);
void addToChecksums(MergeTreeDataPartChecksums & checksumns);
@ -67,11 +61,19 @@ private:
: compressed_buf(buf, codec), hashing_buf(compressed_buf) {}
};
std::unordered_map<String, std::unique_ptr<CompressedStream>> compressed_streams;
using CompressedStreamPtr = std::shared_ptr<CompressedStream>;
std::vector<CompressedStreamPtr> compressed_streams;
/// marks -> marks_file
std::unique_ptr<WriteBufferFromFileBase> marks_file;
HashingWriteBuffer marks;
/// Write single granule of one column (rows between 2 marks)
void writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
const CompressedStreamPtr & stream,
size_t from_row,
size_t number_of_rows) const;
};
}